You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/09/26 08:21:03 UTC

drill git commit: DRILL-1457: Push Limit past through UnionExchange.

Repository: drill
Updated Branches:
  refs/heads/master b8573526d -> 846df965e


DRILL-1457: Push Limit past through UnionExchange.

Close apache/drill#169


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/846df965
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/846df965
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/846df965

Branch: refs/heads/master
Commit: 846df965ea85c4712007ef6a6e9a78f396aa50e4
Parents: b857352
Author: Jinfeng Ni <jn...@apache.org>
Authored: Mon Sep 21 22:42:23 2015 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Sep 25 23:16:38 2015 -0700

----------------------------------------------------------------------
 .../exec/planner/common/DrillLimitRelBase.java  | 30 ++++++-
 .../exec/planner/logical/DrillRuleSets.java     |  2 +
 .../drill/exec/planner/physical/LimitPrel.java  |  6 +-
 .../LimitUnionExchangeTransposeRule.java        | 66 ++++++++++++++++
 .../impl/limit/TestLimitWithExchanges.java      | 83 ++++++++++++++++++++
 5 files changed, 183 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
index a19dff7..8c21c4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLimitRelBase.java
@@ -36,11 +36,21 @@ import org.apache.calcite.rex.RexNode;
 public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNode {
   protected RexNode offset;
   protected RexNode fetch;
+  private boolean pushDown;  // whether limit has been pushed past its child.
+                             // Limit is special in that when it's pushed down, the original LIMIT still remains.
+                             // Once the limit is pushed down, this flag will be TRUE for the original LIMIT
+                             // and be FALSE for the pushed down LIMIT.
+                             // This flag will prevent optimization rules to fire in a loop.
 
   public DrillLimitRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) {
+    this(cluster, traitSet, child, offset, fetch, false);
+  }
+
+  public DrillLimitRelBase(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown) {
     super(cluster, traitSet, child);
     this.offset = offset;
     this.fetch = fetch;
+    this.pushDown = pushDown;
   }
 
   public RexNode getOffset() {
@@ -57,9 +67,7 @@ public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNod
       return super.computeSelfCost(planner).multiplyBy(.1);
     }
 
-    int off = offset != null ? RexLiteral.intValue(offset) : 0 ;
-    int f = fetch != null ? RexLiteral.intValue(fetch) : 0 ;
-    double numRows = off + f;
+    double numRows = getRows();
     double cpuCost = DrillCostBase.COMPARE_CPU_COST * numRows;
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
     return costFactory.makeCost(numRows, cpuCost, 0, 0);
@@ -73,4 +81,20 @@ public abstract class DrillLimitRelBase extends SingleRel implements DrillRelNod
     return pw;
   }
 
+  @Override
+  public double getRows() {
+    int off = offset != null ? RexLiteral.intValue(offset) : 0 ;
+
+    if (fetch == null) {
+      return getInput().getRows() - off;
+    } else {
+      int f = RexLiteral.intValue(fetch);
+      return off + f;
+    }
+  }
+
+  public boolean isPushDown() {
+    return this.pushDown;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 3fa21db..65d6e89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.planner.physical.FilterPrule;
 import org.apache.drill.exec.planner.physical.HashAggPrule;
 import org.apache.drill.exec.planner.physical.HashJoinPrule;
 import org.apache.drill.exec.planner.physical.LimitPrule;
+import org.apache.drill.exec.planner.physical.LimitUnionExchangeTransposeRule;
 import org.apache.drill.exec.planner.physical.MergeJoinPrule;
 import org.apache.drill.exec.planner.physical.NestedLoopJoinPrule;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -221,6 +222,7 @@ public class DrillRuleSets {
     ruleList.add(WriterPrule.INSTANCE);
     ruleList.add(WindowPrule.INSTANCE);
     ruleList.add(PushLimitToTopN.INSTANCE);
+    ruleList.add(LimitUnionExchangeTransposeRule.INSTANCE);
     ruleList.add(UnionAllPrule.INSTANCE);
     ruleList.add(ValuesPrule.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index f3e9991..6cfbe2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -38,9 +38,13 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
     super(cluster, traitSet, child, offset, fetch);
   }
 
+  public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown) {
+    super(cluster, traitSet, child, offset, fetch, pushDown);
+  }
+
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch);
+    return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java
new file mode 100644
index 0000000..25a7ce4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitUnionExchangeTransposeRule.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+import java.math.BigDecimal;
+
+public class LimitUnionExchangeTransposeRule extends Prule{
+  public static final RelOptRule INSTANCE = new LimitUnionExchangeTransposeRule();
+
+  private LimitUnionExchangeTransposeRule() {
+    super(RelOptHelper.some(LimitPrel.class, RelOptHelper.any(UnionExchangePrel.class)), "LimitUnionExchangeTransposeRule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final LimitPrel limit = (LimitPrel) call.rel(0);
+
+    // Two situations we do not fire this rule:
+    // 1) limit has been pushed down to its child,
+    // 2) the fetch() is null (indicating we have to fetch all the remaining rows starting from offset.
+    return !limit.isPushDown() && limit.getFetch() != null;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final LimitPrel limit = (LimitPrel) call.rel(0);
+    final UnionExchangePrel unionExchangePrel = (UnionExchangePrel) call.rel(1);
+
+    RelNode child = unionExchangePrel.getInput();
+
+    final int offset = limit.getOffset() != null ? Math.max(0, RexLiteral.intValue(limit.getOffset())) : 0;
+    final int fetch = Math.max(0, RexLiteral.intValue(limit.getFetch()));
+
+    // child Limit uses conservative approach:  use offset 0 and fetch = parent limit offset + parent limit fetch.
+    final RexNode childFetch = limit.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(offset + fetch));
+
+    final RelNode limitUnderExchange = new LimitPrel(child.getCluster(), child.getTraitSet(), child, null, childFetch);
+    final RelNode newUnionExch = new UnionExchangePrel(unionExchangePrel.getCluster(), unionExchangePrel.getTraitSet(), limitUnderExchange);
+    final RelNode limitAboveExchange = new LimitPrel(limit.getCluster(), limit.getTraitSet(), newUnionExch, limit.getOffset(), limit.getFetch(), true);
+
+    call.transformTo(limitAboveExchange);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/846df965/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
index 0e4d734..789a536 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
@@ -18,12 +18,95 @@
 package org.apache.drill.exec.physical.impl.limit;
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestLimitWithExchanges extends BaseTestQuery {
 
   @Test
   public void testLimitWithExchanges() throws Exception{
     testPhysicalFromFile("limit/limit_exchanges.json");
   }
+
+  @Test
+  public void testPushLimitPastUnionExchange() throws Exception {
+    // Push limit past through UnionExchange.
+    final String WORKING_PATH = TestTools.getWorkingPath();
+    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+    try {
+      test("alter session set `planner.slice_target` = 1");
+      final String[] excludedPlan = {};
+
+      // case 1. single table query.
+      final String sql = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1 offset 2", TEST_RES_PATH);
+      final String[] expectedPlan ={"(?s)Limit\\(offset=\\[2\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[3\\]\\).*Scan"};
+      testLimitHelper(sql, expectedPlan, excludedPlan, 1);
+
+      final String sql2 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1 offset 0", TEST_RES_PATH);
+      final String[] expectedPlan2 = {"(?s)Limit\\(offset=\\[0\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
+      testLimitHelper(sql2, expectedPlan2, excludedPlan, 1);
+
+      final String sql3 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 1", TEST_RES_PATH);
+      final String[] expectedPlan3 = {"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
+      testLimitHelper(sql3, expectedPlan3, excludedPlan, 1);
+
+      // case 2: join query.
+      final String sql4 = String.format(
+          "select * from dfs_test.`%s/tpchmulti/region` r,  dfs_test.`%s/tpchmulti/nation` n " +
+          "where r.r_regionkey = n.n_regionkey limit 1 offset 2", TEST_RES_PATH, TEST_RES_PATH );
+
+      final String[] expectedPlan4 = {"(?s)Limit\\(offset=\\[2\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[3\\]\\).*Join"};
+
+      testLimitHelper(sql4, expectedPlan4, excludedPlan, 1);
+
+      final String sql5 = String.format(
+          "select * from dfs_test.`%s/tpchmulti/region` r,  dfs_test.`%s/tpchmulti/nation` n " +
+              "where r.r_regionkey = n.n_regionkey limit 1", TEST_RES_PATH, TEST_RES_PATH );
+
+      final String[] expectedPlan5 = {"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Join"};
+      testLimitHelper(sql5, expectedPlan5, excludedPlan, 1);
+    } finally {
+      test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue());
+    }
+  }
+
+  @Test
+  public void testNegPushLimitPastUnionExchange() throws Exception {
+    // Negative case: should not push limit past through UnionExchange.
+    final String WORKING_PATH = TestTools.getWorkingPath();
+    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+
+    try {
+      test("alter session set `planner.slice_target` = 1");
+      final String[] expectedPlan ={};
+
+      // case 1. Only "offset", but no "limit" : should not push "limit" down.
+      final String sql = String.format("select * from dfs_test.`%s/tpchmulti/region` offset 2", TEST_RES_PATH);
+      final String[] excludedPlan = {"(?s)Limit\\(offset=\\[2\\].*UnionExchange.*Limit.*Scan"};
+
+      // case 2. "limit" is higher than # of rowcount in table : should not push "limit" down.
+      final String sql2 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit 100", TEST_RES_PATH);
+      final String[] excludedPlan2 = {"(?s)Limit\\(fetch=\\[100\\].*UnionExchange.*Limit.*Scan"};
+
+      testLimitHelper(sql2, expectedPlan, excludedPlan2, 5);
+    } finally {
+      test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue());
+    }
+  }
+
+  private void testLimitHelper(final String sql, final String[] expectedPlan, final String[] excludedPattern, int expectedRecordCount) throws Exception {
+    // Validate the plan
+    PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPattern);
+
+    // Validate row count
+    final int actualRecordCount = testSql(sql);
+    assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s", expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
 }