You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/09/26 08:21:03 UTC
drill git commit: DRILL-1457: Push Limit past through UnionExchange.
Repository: drill
Updated Branches:
refs/heads/master b8573526d -> 846df965e
DRILL-1457: Push Limit past through UnionExchange.
Close apache/drill#169
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/846df965
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/846df965
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/846df965
Branch: refs/heads/master
Commit: 846df965ea85c4712007ef6a6e9a78f396aa50e4
Parents: b857352
Author: Jinfeng Ni <jn...@apache.org>
Authored: Mon Sep 21 22:42:23 2015 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Sep 25 23:16:38 2015 -0700
----------------------------------------------------------------------
.../exec/planner/common/DrillLimitRelBase.java | 30 ++++++-
.../exec/planner/logical/DrillRuleSets.java | 2 +
.../drill/exec/planner/physical/LimitPrel.java | 6 +-
.../LimitUnionExchangeTransposeRule.java | 66 ++++++++++++++++
.../impl/limit/TestLimitWithExchanges.java | 83 ++++++++++++++++++++
5 files changed, 183 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
index a19dff7..8c21c4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
@@ -36,11 +36,21 @@ import org.apache.calcite.rex.RexNode;
public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNode {
protected RexNode offset;
protected RexNode fetch;
+ private boolean pushDown; // whether limit has been pushed past its child.
+ // Limit is special in that when it's pushed down, the original LIMIT still remains.
+ // Once the limit is pushed down, this flag will be TRUE for the original LIMIT
+ // and be FALSE for the pushed down LIMIT.
+ // This flag will prevent optimization rules to fire in a loop.
public DrillLimitRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) {
+ this(cluster, traitSet, child, offset, fetch, false);
+ }
+
+ public DrillLimitRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown) {
super(cluster, traitSet, child);
this.offset = offset;
this.fetch = fetch;
+ this.pushDown = pushDown;
}
public RexNode getOffset() {
@@ -57,9 +67,7 @@ public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNod
return super.computeSelfCost(planner).multiplyBy(.1);
}
- int off = offset != null ? RexLiteral.intValue(offset) : 0 ;
- int f = fetch != null ? RexLiteral.intValue(fetch) : 0 ;
- double numRows = off + f;
+ double numRows = getRows();
double cpuCost = DrillCostBase.COMPARE_CPU_COST * numRows;
DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
return costFactory.makeCost(numRows, cpuCost, 0, 0);
@@ -73,4 +81,20 @@ public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNod
return pw;
}
+ @Override
+ public double getRows() {
+ int off = offset != null ? RexLiteral.intValue(offset) : 0 ;
+
+ if (fetch == null) {
+ return getInput().getRows() - off;
+ } else {
+ int f = RexLiteral.intValue(fetch);
+ return off + f;
+ }
+ }
+
+ public boolean isPushDown() {
+ return this.pushDown;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 3fa21db..65d6e89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.planner.physical.FilterPrule;
import org.apache.drill.exec.planner.physical.HashAggPrule;
import org.apache.drill.exec.planner.physical.HashJoinPrule;
import org.apache.drill.exec.planner.physical.LimitPrule;
+import org.apache.drill.exec.planner.physical.LimitUnionExchangeTransposeRule;
import org.apache.drill.exec.planner.physical.MergeJoinPrule;
import org.apache.drill.exec.planner.physical.NestedLoopJoinPrule;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -221,6 +222,7 @@ public class DrillRuleSets {
ruleList.add(WriterPrule.INSTANCE);
ruleList.add(WindowPrule.INSTANCE);
ruleList.add(PushLimitToTopN.INSTANCE);
+ ruleList.add(LimitUnionExchangeTransposeRule.INSTANCE);
ruleList.add(UnionAllPrule.INSTANCE);
ruleList.add(ValuesPrule.INSTANCE);
http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index f3e9991..6cfbe2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -38,9 +38,13 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
super(cluster, traitSet, child, offset, fetch);
}
+ public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown) {
+ super(cluster, traitSet, child, offset, fetch, pushDown);
+ }
+
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch);
+ return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown());
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java
new file mode 100644
index 0000000..25a7ce4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+import java.math.BigDecimal;
+
+public class LimitUnionExchangeTransposeRule extends Prule{
+ public static final RelOptRule INSTANCE = new LimitUnionExchangeTransposeRule();
+
+ private LimitUnionExchangeTransposeRule() {
+ super(RelOptHelper.some(LimitPrel.class, RelOptHelper.any(UnionExchangePrel.class)), "LimitUnionExchangeTransposeRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final LimitPrel limit = (LimitPrel) call.rel(0);
+
+ // Two situations we do not fire this rule:
+ // 1) limit has been pushed down to its child,
+ // 2) the fetch() is null (indicating we have to fetch all the remaining rows starting from offset.
+ return !limit.isPushDown() && limit.getFetch() != null;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final LimitPrel limit = (LimitPrel) call.rel(0);
+ final UnionExchangePrel unionExchangePrel = (UnionExchangePrel) call.rel(1);
+
+ RelNode child = unionExchangePrel.getInput();
+
+ final int offset = limit.getOffset() != null ? Math.max(0, RexLiteral.intValue(limit.getOffset())) : 0;
+ final int fetch = Math.max(0, RexLiteral.intValue(limit.getFetch()));
+
+ // child Limit uses conservative approach: use offset 0 and fetch = parent limit offset + parent limit fetch.
+ final RexNode childFetch = limit.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(offset + fetch));
+
+ final RelNode limitUnderExchange = new LimitPrel(child.getCluster(), child.getTraitSet(), child, null, childFetch);
+ final RelNode newUnionExch = new UnionExchangePrel(unionExchangePrel.getCluster(), unionExchangePrel.getTraitSet(), limitUnderExchange);
+ final RelNode limitAboveExchange = new LimitPrel(limit.getCluster(), limit.getTraitSet(), newUnionExch, limit.getOffset(), limit.getFetch(), true);
+
+ call.transformTo(limitAboveExchange);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
index 0e4d734..789a536 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
@@ -18,12 +18,95 @@
package org.apache.drill.exec.physical.impl.limit;
import org.apache.drill.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
public class TestLimitWithExchanges extends BaseTestQuery {
@Test
public void testLimitWithExchanges() throws Exception{
testPhysicalFromFile("limit/limit_exchanges.json");
}
+
+ @Test
+ public void testPushLimitPastUnionExchange() throws Exception {
+ // Push limit past through UnionExchange.
+ final String WORKING_PATH = TestTools.getWorkingPath();
+ final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+ try {
+ test("alter session set `planner.slice_target` = 1");
+ final String[] excludedPlan = {};
+
+ // case 1. single table query.
+ final String sql = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1 offset 2", TEST_RES_PATH);
+ final String[] expectedPlan ={"(?s)Limit\\(offset=\\[2\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[3\\]\\).*Scan"};
+ testLimitHelper(sql, expectedPlan, excludedPlan, 1);
+
+ final String sql2 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1 offset 0", TEST_RES_PATH);
+ final String[] expectedPlan2 = {"(?s)Limit\\(offset=\\[0\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
+ testLimitHelper(sql2, expectedPlan2, excludedPlan, 1);
+
+ final String sql3 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1", TEST_RES_PATH);
+ final String[] expectedPlan3 = {"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
+ testLimitHelper(sql3, expectedPlan3, excludedPlan, 1);
+
+ // case 2: join query.
+ final String sql4 = String.format(
+ "select * from dfs_test.`%s/tpchmulti/region` r, dfs_test.`%s/tpchmulti/nation` n " +
+ "where r.r_regionkey = n.n_regionkey limit 1 offset 2", TEST_RES_PATH, TEST_RES_PATH );
+
+ final String[] expectedPlan4 = {"(?s)Limit\\(offset=\\[2\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[3\\]\\).*Join"};
+
+ testLimitHelper(sql4, expectedPlan4, excludedPlan, 1);
+
+ final String sql5 = String.format(
+ "select * from dfs_test.`%s/tpchmulti/region` r, dfs_test.`%s/tpchmulti/nation` n " +
+ "where r.r_regionkey = n.n_regionkey limit 1", TEST_RES_PATH, TEST_RES_PATH );
+
+ final String[] expectedPlan5 = {"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Join"};
+ testLimitHelper(sql5, expectedPlan5, excludedPlan, 1);
+ } finally {
+ test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue());
+ }
+ }
+
+ @Test
+ public void testNegPushLimitPastUnionExchange() throws Exception {
+ // Negative case: should not push limit past through UnionExchange.
+ final String WORKING_PATH = TestTools.getWorkingPath();
+ final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+ try {
+ test("alter session set `planner.slice_target` = 1");
+ final String[] expectedPlan ={};
+
+ // case 1. Only "offset", but no "limit" : should not push "limit" down.
+ final String sql = String.format("select * from dfs_test.`%s/tpchmulti/region` offset 2", TEST_RES_PATH);
+ final String[] excludedPlan = {"(?s)Limit\\(offset=\\[2\\].*UnionExchange.*Limit.*Scan"};
+
+ // case 2. "limit" is higher than # of rowcount in table : should not push "limit" down.
+ final String sql2 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 100", TEST_RES_PATH);
+ final String[] excludedPlan2 = {"(?s)Limit\\(fetch=\\[100\\].*UnionExchange.*Limit.*Scan"};
+
+ testLimitHelper(sql2, expectedPlan, excludedPlan2, 5);
+ } finally {
+ test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue());
+ }
+ }
+
+ private void testLimitHelper(final String sql, final String[] expectedPlan, final String[] excludedPattern, int expectedRecordCount) throws Exception {
+ // Validate the plan
+ PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPattern);
+
+ // Validate row count
+ final int actualRecordCount = testSql(sql);
+ assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s", expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+ }
+
}