You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/06/02 04:50:15 UTC
[drill] 07/10: DRILL-6456: Planner shouldn't create any exchanges
on the right side of Lateral Join.
This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit fde66d776cb16e77c1b3436c8a21364a43fb5f8f
Author: Hanumath Rao Maduri <ha...@gmail.com>
AuthorDate: Wed May 30 17:59:51 2018 -0700
DRILL-6456: Planner shouldn't create any exchanges on the right side of Lateral Join.
This closes #1299
---
.../visitor/ExcessiveExchangeIdentifier.java | 72 ++++++++-
.../impl/lateraljoin/TestLateralPlans.java | 164 ++++++++++++++++++++-
2 files changed, 227 insertions(+), 9 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index 7bfe214..b4ed5e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.planner.physical.visitor;
import java.util.Collections;
import java.util.List;
-
import org.apache.drill.exec.planner.fragment.DistributionAffinity;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ScanPrel;
@@ -28,9 +28,11 @@ import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.calcite.rel.RelNode;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, ExcessiveExchangeIdentifier.MajorFragmentStat, RuntimeException> {
private final long targetSliceSize;
+ private CorrelatePrel topMostLateralJoin = null;
public ExcessiveExchangeIdentifier(long targetSliceSize) {
this.targetSliceSize = targetSliceSize;
@@ -45,18 +47,28 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
public Prel visitExchange(ExchangePrel prel, MajorFragmentStat parent) throws RuntimeException {
parent.add(prel);
MajorFragmentStat newFrag = new MajorFragmentStat();
+ newFrag.setRightSideOfLateral(parent.isRightSideOfLateral());
Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
-
- if (newFrag.isSingular() && parent.isSingular() &&
- // if one of them has strict distribution or none, we can remove the exchange
- (!newFrag.isDistributionStrict() || !parent.isDistributionStrict())
- ) {
+ if (canRemoveExchange(parent, newFrag)) {
return newChild;
} else {
return (Prel) prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode) newChild));
}
}
+ private boolean canRemoveExchange(MajorFragmentStat parentFrag, MajorFragmentStat childFrag) {
+ if (childFrag.isSingular() && parentFrag.isSingular() &&
+ (!childFrag.isDistributionStrict() || !parentFrag.isDistributionStrict())) {
+ return true;
+ }
+
+ if (parentFrag.isRightSideOfLateral()) {
+ return true;
+ }
+
+ return false;
+ }
+
@Override
public Prel visitScreen(ScreenPrel prel, MajorFragmentStat s) throws RuntimeException {
s.addScreen(prel);
@@ -71,6 +83,40 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
}
@Override
+ public Prel visitCorrelate(CorrelatePrel prel, MajorFragmentStat s) throws RuntimeException {
+ List<RelNode> children = Lists.newArrayList();
+ s.add(prel);
+
+ for (Prel p : prel) {
+ s.add(p);
+ }
+
+ // Traverse the left side of the Lateral join first. Left side of the
+ // Lateral shouldn't have any restrictions on Exchanges.
+ children.add(((Prel)prel.getInput(0)).accept(this, s));
+ // Save the outermost Lateral join so as to unset the flag later.
+ if (topMostLateralJoin == null) {
+ topMostLateralJoin = prel;
+ }
+
+ // Right side of the Lateral shouldn't have any Exchanges. Hence set the
+ // flag so that visitExchange removes the exchanges.
+ s.setRightSideOfLateral(true);
+ children.add(((Prel)prel.getInput(1)).accept(this, s));
+ if (topMostLateralJoin == prel) {
+ topMostLateralJoin = null;
+ s.setRightSideOfLateral(false);
+ }
+ return (Prel) prel.copy(prel.getTraitSet(), children);
+ }
+
+ @Override
+ public Prel visitUnnest(UnnestPrel prel, MajorFragmentStat s) throws RuntimeException {
+ s.addUnnest(prel);
+ return prel;
+ }
+
+ @Override
public Prel visitPrel(Prel prel, MajorFragmentStat s) throws RuntimeException {
List<RelNode> children = Lists.newArrayList();
s.add(prel);
@@ -98,6 +144,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
private double maxRows = 0d;
private int maxWidth = Integer.MAX_VALUE;
private boolean isMultiSubScan = false;
+ private boolean rightSideOfLateral = false;
public void add(Prel prel) {
maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -130,9 +177,20 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
return w == 1;
}
+ public boolean isRightSideOfLateral() {
+ return this.rightSideOfLateral;
+ }
+
+ public void addUnnest(UnnestPrel prel) {
+ add(prel);
+ }
+
+ public void setRightSideOfLateral(boolean rightSideOfLateral) {
+ this.rightSideOfLateral = rightSideOfLateral;
+ }
+
public boolean isDistributionStrict() {
return distributionAffinity == DistributionAffinity.HARD;
}
}
-
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 9e19729..00ab971 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -18,10 +18,13 @@
package org.apache.drill.exec.physical.impl.lateraljoin;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import org.apache.drill.PlanTestBase;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
@@ -30,10 +33,18 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Ignore;
+import java.nio.file.Paths;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class TestLateralPlans extends BaseTestQuery {
+ private static final String regularTestFile_1 = "cust_order_10_1.json";
+ private static final String regularTestFile_2 = "cust_order_10_2.json";
@BeforeClass
public static void enableUnnestLateral() throws Exception {
+ dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_1));
+ dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_2));
test("alter session set `planner.enable_unnest_lateral`=true");
}
@@ -255,7 +266,7 @@ public class TestLateralPlans extends BaseTestQuery {
.sql(Sql)
.run();
} catch (UserRemoteException ex) {
- assert(ex.getMessage().contains("Alias table and column name are required for UNNEST"));
+ assertTrue(ex.getMessage().contains("Alias table and column name are required for UNNEST"));
}
}
@@ -272,7 +283,156 @@ public class TestLateralPlans extends BaseTestQuery {
.sql(Sql)
.run();
} catch (UserRemoteException ex) {
- assert(ex.getMessage().contains("Alias table and column name are required for UNNEST"));
+ assertTrue(ex.getMessage().contains("Alias table and column name are required for UNNEST"));
+ }
+ }
+
+ /***********************************************************************************************
+ Following test cases are introduced to make sure no exchanges are present on right side of
+ Lateral join.
+ **********************************************************************************************/
+
+ @Test
+ public void testNoExchangeWithAggWithoutGrpBy() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," +
+ " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord)) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithStreamAggWithGrpBy() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," +
+ " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+ .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+ .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
}
}
+
+ @Test
+ public void testNoExchangeWithHashAggWithGrpBy() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," +
+ " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+ .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), true)
+ .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), false);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithOrderByWithoutLimit() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," +
+ " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithOrderByLimit() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," +
+ " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+
+ @Test
+ public void testNoExchangeWithLateralsDownStreamJoin() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t, dfs.`lateraljoin/multipleFiles` t2, " +
+ " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1" +
+ " where t.c_name = t2.c_name";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithLateralsDownStreamUnion() throws Exception {
+ String Sql = "select t.c_name from dfs.`lateraljoin/multipleFiles` t union all " +
+ " select t.c_name from dfs.`lateraljoin/multipleFiles` t, " +
+ " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithLateralsDownStreamAgg() throws Exception {
+ String Sql = "select sum(d1.totalprice) from dfs.`lateraljoin/multipleFiles` t, " +
+ " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1 group by t.c_custkey";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+ .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+ .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ private String getRightChildOfLateral(String explain) throws Exception {
+ Matcher matcher = Pattern.compile("Correlate.*Unnest", Pattern.MULTILINE | Pattern.DOTALL).matcher(explain);
+ assertTrue (matcher.find());
+ String CorrelateUnnest = matcher.group(0);
+ return CorrelateUnnest.substring(CorrelateUnnest.lastIndexOf("Scan"));
+ }
}
--
To stop receiving notification emails like this one, please contact
parthc@apache.org.