You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/03/03 18:47:18 UTC

[12/17] drill git commit: DRILL-6099: Push limit past flatten(project) without pushdown into scan

DRILL-6099: Push limit past flatten(project) without pushdown into scan

closes #1096


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6a55b2b2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6a55b2b2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6a55b2b2

Branch: refs/heads/master
Commit: 6a55b2b21e047ba44f8f2d19381f18ae44263e26
Parents: 6af651f
Author: Gautam Parai <gp...@maprtech.com>
Authored: Thu Jan 18 15:46:42 2018 -0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Sat Mar 3 19:47:50 2018 +0200

----------------------------------------------------------------------
 .../apache/drill/exec/planner/PlannerPhase.java |  8 ++-
 .../exec/planner/common/DrillRelOptUtil.java    | 63 ++++++++++++++++++++
 .../exec/planner/logical/DrillLimitRel.java     |  6 +-
 .../logical/DrillPushLimitToScanRule.java       | 41 +++++++++----
 .../planner/sql/handlers/FindLimit0Visitor.java | 20 +------
 .../impl/flatten/TestFlattenPlanning.java       |  7 +++
 6 files changed, 112 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 18dfb35..f46a7ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -88,7 +88,7 @@ import java.util.List;
 public enum PlannerPhase {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class);
 
-  LOGICAL_PRUNE_AND_JOIN("Loigcal Planning (with join and partition pruning)") {
+  LOGICAL_PRUNE_AND_JOIN("Logical Planning (with join and partition pruning)") {
     public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
       return PlannerPhase.mergedRuleSets(
           getDrillBasicRules(context),
@@ -274,6 +274,7 @@ public enum PlannerPhase {
        */
       DrillPushProjectPastFilterRule.INSTANCE,
       DrillPushProjectPastJoinRule.INSTANCE,
+
       // Due to infinite loop in planning (DRILL-3257), temporarily disable this rule
       //DrillProjectSetOpTransposeRule.INSTANCE,
       RuleInstance.PROJECT_WINDOW_TRANSPOSE_RULE,
@@ -342,8 +343,9 @@ public enum PlannerPhase {
             PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
             ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
             ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext),
-            DrillPushLimitToScanRule.LIMIT_ON_SCAN,
-            DrillPushLimitToScanRule.LIMIT_ON_PROJECT
+            // Include LIMIT_ON_PROJECT since LIMIT_ON_SCAN may not work without it
+            DrillPushLimitToScanRule.LIMIT_ON_PROJECT,
+            DrillPushLimitToScanRule.LIMIT_ON_SCAN
         )
         .build();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 91c33bd..d5c8d94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -30,9 +30,11 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.Pair;
@@ -224,4 +226,65 @@ public abstract class DrillRelOptUtil {
     }
   }
 
+  public static boolean isLimit0(RexNode fetch) {
+    if (fetch != null && fetch.isA(SqlKind.LITERAL)) {
+      RexLiteral l = (RexLiteral) fetch;
+      switch (l.getTypeName()) {
+        case BIGINT:
+        case INTEGER:
+        case DECIMAL:
+          if (((long) l.getValue2()) == 0) {
+            return true;
+          }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Find whether the given project rel can produce non-scalar output (hence unknown rowcount). This
+   * would happen if the project has a flatten
+   * @param project : The project rel
+   * @return : Return true if the rowcount is unknown. Otherwise, false.
+   */
+  public static boolean isProjectOutputRowcountUnknown(RelNode project) {
+    assert project instanceof Project : "Rel is NOT an instance of project!";
+    for (RexNode rex : project.getChildExps()) {
+      if (rex instanceof RexCall) {
+        if ("flatten".equals(((RexCall) rex).getOperator().getName().toLowerCase())) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Find whether the given project rel has unknown output schema. This would happen if the
+   * project has CONVERT_FROMJSON which can only derive the schema after evaluation is performed
+   * @param project : The project rel
+   * @return : Return true if the project output schema is unknown. Otherwise, false.
+   */
+  public static boolean isProjectOutputSchemaUnknown(RelNode project) {
+    assert project instanceof Project : "Rel is NOT an instance of project!";
+    try {
+      RexVisitor<Void> visitor =
+          new RexVisitorImpl<Void>(true) {
+            public Void visitCall(RexCall call) {
+              if ("convert_fromjson".equals(call.getOperator().getName().toLowerCase())) {
+                throw new Util.FoundOne(call); /* throw exception to interrupt tree walk (this is similar to
+                                              other utility methods in RexUtil.java */
+              }
+              return super.visitCall(call);
+            }
+          };
+      for (RexNode rex : ((Project) project).getProjects()) {
+        rex.accept(visitor);
+      }
+    } catch (Util.FoundOne e) {
+      Util.swallow(e, null);
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
index 9faf070..bef8b2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
@@ -38,9 +38,13 @@ public class DrillLimitRel extends DrillLimitRelBase implements DrillRel {
     super(cluster, traitSet, child, offset, fetch);
   }
 
+  public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown) {
+    super(cluster, traitSet, child, offset, fetch, pushDown);
+  }
+
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch);
+    return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
index 9c06897..068252d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
@@ -23,7 +23,14 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.util.Util;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 
 public abstract class DrillPushLimitToScanRule extends RelOptRule {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillPushLimitToScanRule.class);
@@ -55,18 +62,21 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
     }
   };
 
-  public static DrillPushLimitToScanRule LIMIT_ON_PROJECT =
-      new DrillPushLimitToScanRule(
-          RelOptHelper.some(DrillLimitRel.class, RelOptHelper.some(
-              DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
-          "DrillPushLimitToScanRule_LimitOnProject") {
+  public static DrillPushLimitToScanRule LIMIT_ON_PROJECT = new DrillPushLimitToScanRule(
+      RelOptHelper.some(DrillLimitRel.class, RelOptHelper.any(DrillProjectRel.class)), "DrillPushLimitToScanRule_LimitOnProject") {
     @Override
     public boolean matches(RelOptRuleCall call) {
       DrillLimitRel limitRel = call.rel(0);
-      DrillScanRel scanRel = call.rel(2);
-      // For now only applies to Parquet. And pushdown only apply limit but not offset,
+      DrillProjectRel projectRel = call.rel(1);
+      // pushdown only apply limit but not offset,
       // so if getFetch() return null no need to run this rule.
-      if (scanRel.getGroupScan().supportsLimitPushdown() && (limitRel.getFetch() != null)) {
+      // Do not push across Project containing CONVERT_FROMJSON for limit 0 queries. For limit 0 queries, this would
+      // mess up the schema since Convert_FromJson() is different from other regular functions in that it only knows
+      // the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have a way
+      // to know the output type.
+      if (!limitRel.isPushDown() && (limitRel.getFetch() != null)
+          && (!DrillRelOptUtil.isLimit0(limitRel.getFetch())
+            || !DrillRelOptUtil.isProjectOutputSchemaUnknown(projectRel))) {
         return true;
       }
       return false;
@@ -76,12 +86,20 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
     public void onMatch(RelOptRuleCall call) {
       DrillLimitRel limitRel = call.rel(0);
       DrillProjectRel projectRel = call.rel(1);
-      DrillScanRel scanRel = call.rel(2);
-      doOnMatch(call, limitRel, scanRel, projectRel);
+      RelNode child = projectRel.getInput();
+      final RelNode limitUnderProject = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(child));
+      final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of(limitUnderProject));
+      if (DrillRelOptUtil.isProjectOutputRowcountUnknown(projectRel)) {
+        //Preserve limit above the project since Flatten can produce more rows. Also mark it so we do not fire the rule again.
+        final RelNode limitAboveProject = new DrillLimitRel(limitRel.getCluster(), limitRel.getTraitSet(), newProject,
+            limitRel.getOffset(), limitRel.getFetch(), true);
+        call.transformTo(limitAboveProject);
+      } else {
+        call.transformTo(newProject);
+      }
     }
   };
 
-
   protected void doOnMatch(RelOptRuleCall call, DrillLimitRel limitRel, DrillScanRel scanRel, DrillProjectRel projectRel){
     try {
       final int rowCountRequested = (int) limitRel.getRows();
@@ -113,6 +131,5 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
     }  catch (Exception e) {
       logger.warn("Exception while using the pruned partitions.", e);
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index 166c350..03d5f75 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
 import org.apache.drill.exec.planner.logical.DrillLimitRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
@@ -135,24 +136,9 @@ public class FindLimit0Visitor extends RelShuttleImpl {
     return contains;
   }
 
-  private static boolean isLimit0(RexNode fetch) {
-    if (fetch != null && fetch.isA(SqlKind.LITERAL)) {
-      RexLiteral l = (RexLiteral) fetch;
-      switch (l.getTypeName()) {
-      case BIGINT:
-      case INTEGER:
-      case DECIMAL:
-        if (((long) l.getValue2()) == 0) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
   @Override
   public RelNode visit(LogicalSort sort) {
-    if (isLimit0(sort.fetch)) {
+    if (DrillRelOptUtil.isLimit0(sort.fetch)) {
       contains = true;
       return sort;
     }
@@ -163,7 +149,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
   @Override
   public RelNode visit(RelNode other) {
     if (other instanceof DrillLimitRel) {
-      if (isLimit0(((DrillLimitRel) other).getFetch())) {
+      if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) {
         contains = true;
         return other;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
index 1a5117f..0a28d69 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
@@ -63,5 +63,12 @@ public class TestFlattenPlanning extends PlanTestBase {
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlans, excludedPlans);
   }
 
+  @Test // DRILL-6099 : push limit past flatten(project)
+  public void testLimitPushdownPastFlatten() throws Exception {
+    final String query = "select rownum, flatten(complex) comp from cp.`store/json/test_flatten_mappify2.json` limit 1";
+    final String[] expectedPatterns = {".*Limit\\(fetch=\\[1\\]\\).*",".*Flatten.*",".*Limit\\(fetch=\\[1\\]\\).*"};
+    final String[] excludedPatterns = null;
+    PlanTestBase.testPlanMatchingPatterns(query, expectedPatterns, excludedPatterns);
+  }
 
 }