You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/03/03 18:47:18 UTC
[12/17] drill git commit: DRILL-6099: Push limit past
flatten(project) without pushdown into scan
DRILL-6099: Push limit past flatten(project) without pushdown into scan
closes #1096
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6a55b2b2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6a55b2b2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6a55b2b2
Branch: refs/heads/master
Commit: 6a55b2b21e047ba44f8f2d19381f18ae44263e26
Parents: 6af651f
Author: Gautam Parai <gp...@maprtech.com>
Authored: Thu Jan 18 15:46:42 2018 -0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Sat Mar 3 19:47:50 2018 +0200
----------------------------------------------------------------------
.../apache/drill/exec/planner/PlannerPhase.java | 8 ++-
.../exec/planner/common/DrillRelOptUtil.java | 63 ++++++++++++++++++++
.../exec/planner/logical/DrillLimitRel.java | 6 +-
.../logical/DrillPushLimitToScanRule.java | 41 +++++++++----
.../planner/sql/handlers/FindLimit0Visitor.java | 20 +------
.../impl/flatten/TestFlattenPlanning.java | 7 +++
6 files changed, 112 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 18dfb35..f46a7ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -88,7 +88,7 @@ import java.util.List;
public enum PlannerPhase {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class);
- LOGICAL_PRUNE_AND_JOIN("Loigcal Planning (with join and partition pruning)") {
+ LOGICAL_PRUNE_AND_JOIN("Logical Planning (with join and partition pruning)") {
public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
return PlannerPhase.mergedRuleSets(
getDrillBasicRules(context),
@@ -274,6 +274,7 @@ public enum PlannerPhase {
*/
DrillPushProjectPastFilterRule.INSTANCE,
DrillPushProjectPastJoinRule.INSTANCE,
+
// Due to infinite loop in planning (DRILL-3257), temporarily disable this rule
//DrillProjectSetOpTransposeRule.INSTANCE,
RuleInstance.PROJECT_WINDOW_TRANSPOSE_RULE,
@@ -342,8 +343,9 @@ public enum PlannerPhase {
PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext),
- DrillPushLimitToScanRule.LIMIT_ON_SCAN,
- DrillPushLimitToScanRule.LIMIT_ON_PROJECT
+ // Include LIMIT_ON_PROJECT since LIMIT_ON_SCAN may not work without it
+ DrillPushLimitToScanRule.LIMIT_ON_PROJECT,
+ DrillPushLimitToScanRule.LIMIT_ON_SCAN
)
.build();
http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 91c33bd..d5c8d94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -30,9 +30,11 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.Pair;
@@ -224,4 +226,65 @@ public abstract class DrillRelOptUtil {
}
}
+ public static boolean isLimit0(RexNode fetch) {
+ if (fetch != null && fetch.isA(SqlKind.LITERAL)) {
+ RexLiteral l = (RexLiteral) fetch;
+ switch (l.getTypeName()) {
+ case BIGINT:
+ case INTEGER:
+ case DECIMAL:
+ if (((long) l.getValue2()) == 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Find whether the given project rel can produce non-scalar output (hence unknown rowcount). This
+ * would happen if the project has a flatten
+ * @param project : The project rel
+ * @return : Return true if the rowcount is unknown. Otherwise, false.
+ */
+ public static boolean isProjectOutputRowcountUnknown(RelNode project) {
+ assert project instanceof Project : "Rel is NOT an instance of project!";
+ for (RexNode rex : project.getChildExps()) {
+ if (rex instanceof RexCall) {
+ if ("flatten".equals(((RexCall) rex).getOperator().getName().toLowerCase())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Find whether the given project rel has unknown output schema. This would happen if the
+ * project has CONVERT_FROMJSON which can only derive the schema after evaluation is performed
+ * @param project : The project rel
+ * @return : Return true if the project output schema is unknown. Otherwise, false.
+ */
+ public static boolean isProjectOutputSchemaUnknown(RelNode project) {
+ assert project instanceof Project : "Rel is NOT an instance of project!";
+ try {
+ RexVisitor<Void> visitor =
+ new RexVisitorImpl<Void>(true) {
+ public Void visitCall(RexCall call) {
+ if ("convert_fromjson".equals(call.getOperator().getName().toLowerCase())) {
+ throw new Util.FoundOne(call); /* throw exception to interrupt tree walk (this is similar to
+ other utility methods in RexUtil.java */
+ }
+ return super.visitCall(call);
+ }
+ };
+ for (RexNode rex : ((Project) project).getProjects()) {
+ rex.accept(visitor);
+ }
+ } catch (Util.FoundOne e) {
+ Util.swallow(e, null);
+ return true;
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
index 9faf070..bef8b2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
@@ -38,9 +38,13 @@ public class DrillLimitRel extends DrillLimitRelBase implements DrillRel {
super(cluster, traitSet, child, offset, fetch);
}
+ public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown) {
+ super(cluster, traitSet, child, offset, fetch, pushDown);
+ }
+
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch);
+ return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown());
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
index 9c06897..068252d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
@@ -23,7 +23,14 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.util.Util;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
public abstract class DrillPushLimitToScanRule extends RelOptRule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillPushLimitToScanRule.class);
@@ -55,18 +62,21 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
}
};
- public static DrillPushLimitToScanRule LIMIT_ON_PROJECT =
- new DrillPushLimitToScanRule(
- RelOptHelper.some(DrillLimitRel.class, RelOptHelper.some(
- DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
- "DrillPushLimitToScanRule_LimitOnProject") {
+ public static DrillPushLimitToScanRule LIMIT_ON_PROJECT = new DrillPushLimitToScanRule(
+ RelOptHelper.some(DrillLimitRel.class, RelOptHelper.any(DrillProjectRel.class)), "DrillPushLimitToScanRule_LimitOnProject") {
@Override
public boolean matches(RelOptRuleCall call) {
DrillLimitRel limitRel = call.rel(0);
- DrillScanRel scanRel = call.rel(2);
- // For now only applies to Parquet. And pushdown only apply limit but not offset,
+ DrillProjectRel projectRel = call.rel(1);
+ // pushdown only apply limit but not offset,
// so if getFetch() return null no need to run this rule.
- if (scanRel.getGroupScan().supportsLimitPushdown() && (limitRel.getFetch() != null)) {
+ // Do not push across Project containing CONVERT_FROMJSON for limit 0 queries. For limit 0 queries, this would
+ // mess up the schema since Convert_FromJson() is different from other regular functions in that it only knows
+ // the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have a way
+ // to know the output type.
+ if (!limitRel.isPushDown() && (limitRel.getFetch() != null)
+ && (!DrillRelOptUtil.isLimit0(limitRel.getFetch())
+ || !DrillRelOptUtil.isProjectOutputSchemaUnknown(projectRel))) {
return true;
}
return false;
@@ -76,12 +86,20 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
public void onMatch(RelOptRuleCall call) {
DrillLimitRel limitRel = call.rel(0);
DrillProjectRel projectRel = call.rel(1);
- DrillScanRel scanRel = call.rel(2);
- doOnMatch(call, limitRel, scanRel, projectRel);
+ RelNode child = projectRel.getInput();
+ final RelNode limitUnderProject = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(child));
+ final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of(limitUnderProject));
+ if (DrillRelOptUtil.isProjectOutputRowcountUnknown(projectRel)) {
+ //Preserve limit above the project since Flatten can produce more rows. Also mark it so we do not fire the rule again.
+ final RelNode limitAboveProject = new DrillLimitRel(limitRel.getCluster(), limitRel.getTraitSet(), newProject,
+ limitRel.getOffset(), limitRel.getFetch(), true);
+ call.transformTo(limitAboveProject);
+ } else {
+ call.transformTo(newProject);
+ }
}
};
-
protected void doOnMatch(RelOptRuleCall call, DrillLimitRel limitRel, DrillScanRel scanRel, DrillProjectRel projectRel){
try {
final int rowCountRequested = (int) limitRel.getRows();
@@ -113,6 +131,5 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
} catch (Exception e) {
logger.warn("Exception while using the pruned partitions.", e);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index 166c350..03d5f75 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
import org.apache.drill.exec.planner.logical.DrillLimitRel;
import org.apache.drill.exec.planner.logical.DrillRel;
@@ -135,24 +136,9 @@ public class FindLimit0Visitor extends RelShuttleImpl {
return contains;
}
- private static boolean isLimit0(RexNode fetch) {
- if (fetch != null && fetch.isA(SqlKind.LITERAL)) {
- RexLiteral l = (RexLiteral) fetch;
- switch (l.getTypeName()) {
- case BIGINT:
- case INTEGER:
- case DECIMAL:
- if (((long) l.getValue2()) == 0) {
- return true;
- }
- }
- }
- return false;
- }
-
@Override
public RelNode visit(LogicalSort sort) {
- if (isLimit0(sort.fetch)) {
+ if (DrillRelOptUtil.isLimit0(sort.fetch)) {
contains = true;
return sort;
}
@@ -163,7 +149,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
@Override
public RelNode visit(RelNode other) {
if (other instanceof DrillLimitRel) {
- if (isLimit0(((DrillLimitRel) other).getFetch())) {
+ if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) {
contains = true;
return other;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6a55b2b2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
index 1a5117f..0a28d69 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
@@ -63,5 +63,12 @@ public class TestFlattenPlanning extends PlanTestBase {
PlanTestBase.testPlanMatchingPatterns(query, expectedPlans, excludedPlans);
}
+ @Test // DRILL-6099 : push limit past flatten(project)
+ public void testLimitPushdownPastFlatten() throws Exception {
+ final String query = "select rownum, flatten(complex) comp from cp.`store/json/test_flatten_mappify2.json` limit 1";
+ final String[] expectedPatterns = {".*Limit\\(fetch=\\[1\\]\\).*",".*Flatten.*",".*Limit\\(fetch=\\[1\\]\\).*"};
+ final String[] excludedPatterns = null;
+ PlanTestBase.testPlanMatchingPatterns(query, expectedPatterns, excludedPatterns);
+ }
}