You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/12/12 12:24:27 UTC
[drill] branch master updated: DRILL-6878: Use
DrillPushRowKeyJoinToScan rule on DrillJoin pattern to account for
DrillSemiJoin
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new b954c57 DRILL-6878: Use DrillPushRowKeyJoinToScan rule on DrillJoin pattern to account for DrillSemiJoin
b954c57 is described below
commit b954c57c3c1088c4ae4765c2ea2f165d70c2c31d
Author: Gautam Parai <gp...@maprtech.com>
AuthorDate: Mon Dec 3 18:03:23 2018 -0800
DRILL-6878: Use DrillPushRowKeyJoinToScan rule on DrillJoin pattern to account for DrillSemiJoin
closes #1568
---
.../drill/maprdb/tests/index/IndexPlanTest.java | 54 +++++++--------
.../drill/exec/planner/logical/DrillJoin.java | 3 +
.../drill/exec/planner/logical/DrillJoinRel.java | 5 ++
.../logical/DrillPushRowKeyJoinToScanRule.java | 16 ++---
.../exec/planner/logical/DrillSemiJoinRel.java | 5 ++
.../planner/logical/RowKeyJoinCallContext.java | 6 +-
.../drill/exec/planner/logical/RowKeyJoinRel.java | 77 +++++++++++++++++++++-
.../drill/exec/planner/physical/HashJoinPrule.java | 4 +-
.../drill/exec/planner/physical/JoinPruleBase.java | 25 +++----
.../drill/exec/planner/physical/MergeJoinPrel.java | 5 ++
.../exec/planner/physical/MergeJoinPrule.java | 4 +-
.../exec/planner/physical/NestedLoopJoinPrel.java | 5 ++
.../exec/planner/physical/NestedLoopJoinPrule.java | 2 +-
.../exec/planner/physical/RowKeyJoinPrel.java | 9 ++-
14 files changed, 162 insertions(+), 58 deletions(-)
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
index 18e2316..bec5d3a 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
@@ -1130,6 +1130,7 @@ public class IndexPlanTest extends BaseJsonTest {
);
}
+ @Ignore
@Test
public void testCastTimestampPlan() throws Exception {
String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t " +
@@ -1698,8 +1699,7 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_1() throws Exception {
// _id IN (select col ...)
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " +
- " from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+ " from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs' and t2.address.state = 'pc')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
@@ -1707,6 +1707,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1717,8 +1718,7 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_2() throws Exception {
// _id = col
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
- " where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+ " where t1._id = t2._id and t2.address.city = 'pfrrs' and t2.address.state = 'pc'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
@@ -1726,6 +1726,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1736,9 +1737,7 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_3() throws Exception {
// filters on both sides of the join
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
- " where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') and cast(t1.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') ";
+ " where t1._id = t2._id and t1.address.city = 'pfrrs' and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
@@ -1746,6 +1745,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1756,8 +1756,7 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_4() throws Exception {
// _id = cast(col as int) works since the rowids are internally cast to string!
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
- " where t1._id = cast(t2.rowid as int) and cast(t2.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+ " where t1._id = cast(t2.rowid as int) and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
@@ -1765,6 +1764,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1775,8 +1775,7 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_5() throws Exception {
// _id = cast(cast(col as int) as varchar(10)
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
- " where t1._id = cast(cast(t2.rowid as int) as varchar(10)) " +
- " and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+ " where t1._id = cast(cast(t2.rowid as int) as varchar(10)) and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
@@ -1784,6 +1783,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1795,15 +1795,14 @@ public class IndexPlanTest extends BaseJsonTest {
// _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...)
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in " +
"(select cast(cast(t2.rowid as int) as varchar(10)) from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 " +
- "where t2.address.city = t3.address.city and cast(t2.activity.irs.firstlogin as timestamp) = " +
- "to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+ "where t2.address.city = t3.address.city and t2.name.fname = 'ubar')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
- .baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100001382")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1814,17 +1813,17 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_7() throws Exception {
// with non-covering index
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
- "where t1._id = t2.rowid and cast(t2.activity.irs.firstlogin as timestamp) = " +
- "to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+ "where t1._id = t2.rowid and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + incrnonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query,
- new String[] {"RowKeyJoin", "RestrictedJsonTableGroupScan", "RowKeyJoin", "indexName=hash_i_cast_timestamp_firstlogin"},
+ new String[] {"RowKeyJoin", "RestrictedJsonTableGroupScan", "RowKeyJoin", "Scan.*condition=\\(address.city = \"pfrrs\"\\)"},
new String[] {});
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1855,8 +1854,7 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_9() throws Exception {
// Negative test - rowkey join should not be present
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " +
- "(select t2._id from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+ "(select t2._id from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
@@ -1864,6 +1862,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1874,8 +1873,7 @@ public class IndexPlanTest extends BaseJsonTest {
public void testRowkeyJoinPushdown_10() throws Exception {
// Negative test - rowkey join should not be present
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
- " where cast(t1._id as varchar(10)) = cast(t2._id as varchar(10)) and cast(t2.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+ " where cast(t1._id as varchar(10)) = cast(t2._id as varchar(10)) and t2.address.city = 'pfrrs'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
@@ -1883,6 +1881,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
@@ -1894,7 +1893,7 @@ public class IndexPlanTest extends BaseJsonTest {
// Negative test - rowkey join should not be present
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " +
"(select t2._id from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " +
- "and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+ "and t2.address.city = 'pfrrs')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
@@ -1902,20 +1901,21 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
}
}
+ @Ignore
@Test
public void testRowkeyJoinPushdown_12() throws Exception {
// JOIN _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...) - rowkey join appears in intermediate join order
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t4 " +
"where t1.address.city = t4.address.city and t1._id in (select cast(cast(t2.rowid as int) as varchar(10)) " +
"from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " +
- "and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')) " +
- "and t4.address.state = 'pc'";
+ "and t2.address.state = 'pc') and t4.address.state = 'pc'";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query,
@@ -1932,12 +1932,12 @@ public class IndexPlanTest extends BaseJsonTest {
}
}
+ @Ignore
@Test
public void testRowkeyJoinPushdown_13() throws Exception {
// Check option planner.rowkeyjoin_conversion_using_hashjoin works as expected!
String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " +
- " from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
- " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+ " from hbase.`index_test_primary` t2 where t2.address.city = 'pfrrs')";
try {
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
PlanTestBase.testPlanMatchingPatterns(query, new String[]{"RowKeyJoin"}, new String[]{});
@@ -1945,6 +1945,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";" +
forceRowKeyJoinConversionUsingHashJoin + ";");
@@ -1953,6 +1954,7 @@ public class IndexPlanTest extends BaseJsonTest {
.sqlQuery(query)
.ordered()
.baselineColumns("ssn").baselineValues("100007423")
+ .baselineColumns("ssn").baselineValues("100008861")
.go();
} finally {
test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";" +
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
index 30067da..6be7234 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
@@ -45,4 +45,7 @@ public interface DrillJoin extends DrillRelNode {
/* Right RelNode of the Join Relation */
RelNode getRight();
+
+ /* Does semi-join? */
+ boolean isSemiJoin();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index 2559d28..724d5cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -189,4 +189,9 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
inputs.left, inputs.right, rexCondition, join.getJoinType());
return joinRel;
}
+
+ @Override
+ public boolean isSemiJoin() {
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java
index 7c0a9b7..b06c58f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java
@@ -87,7 +87,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
}
public static DrillPushRowKeyJoinToScanRule JOIN = new DrillPushRowKeyJoinToScanRule(
- RelOptHelper.any(DrillJoinRel.class), "DrillPushRowKeyJoinToScanRule_Join", new MatchRelJ());
+ RelOptHelper.any(DrillJoin.class), "DrillPushRowKeyJoinToScanRule_Join", new MatchRelJ());
public static class MatchRelJ implements MatchFunction<RowKeyJoinCallContext> {
/*
@@ -150,7 +150,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
* plan nodes. It tries to identify some RelNode sequences e.g. Filter-Project-Scan and generates
* the context based on the identified sequence.
*/
- private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoinRel joinRel,
+ private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoin joinRel,
RelNode joinChildRel, RowKey rowKeyLoc, int rowKeyPos, boolean swapInputs) {
List<RelNode> matchingRels;
// Sequence of rels (PFPS, FPS, PS, FS, S) matched for this rule
@@ -199,7 +199,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
@Override
public boolean match(RelOptRuleCall call) {
- DrillJoinRel joinRel = call.rel(0);
+ DrillJoin joinRel = call.rel(0);
//Perform validity checks
logger.debug("DrillPushRowKeyJoinToScanRule begin()");
return canPushRowKeyJoinToScan(joinRel, call.getPlanner()).left;
@@ -207,7 +207,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
@Override
public RowKeyJoinCallContext onMatch(RelOptRuleCall call) {
- DrillJoinRel joinRel = call.rel(0);
+ DrillJoin joinRel = call.rel(0);
/*
* Find which side of the join (left/right) has the primary-key column. Then find which sequence of rels
* is present on that side of the join. We will need this sequence to correctly transform the left
@@ -234,7 +234,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
/* Assumption : Only the non-rowkey side needs to be checked. The row-key side does not have
* any blocking operators for the transformation to work
*/
- private static boolean canSwapJoinInputs(DrillJoinRel joinRel, RowKey rowKeyLocation) {
+ private static boolean canSwapJoinInputs(DrillJoin joinRel, RowKey rowKeyLocation) {
// We cannot swap the join inputs if the join is a semi-join. We determine it indirectly, by
// checking for the presence of a aggregating Aggregate Rel (computes aggregates e.g. sum).
if (rowKeyLocation == RowKey.LEFT
@@ -281,7 +281,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
* whether the rowkey is present on the left/right side of the join and its 0-based index in the projection of that
* side.
*/
- private static Pair<Boolean, Pair<RowKey, Integer>> canPushRowKeyJoinToScan(DrillJoinRel joinRel, RelOptPlanner planner) {
+ private static Pair<Boolean, Pair<RowKey, Integer>> canPushRowKeyJoinToScan(DrillJoin joinRel, RelOptPlanner planner) {
RowKey rowKeyLoc = RowKey.NONE;
logger.debug("canPushRowKeyJoinToScan(): Check: Rel={}", joinRel);
@@ -506,7 +506,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
}
}
- private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInputs, DrillJoinRel joinRel,
+ private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInputs, DrillJoin joinRel,
DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel, DrillScanRel scanRel) {
// Swap the inputs, when necessary (i.e. when the primary-key col is on the right-side of the join)
logger.debug("Transforming: Swapping of join inputs is required!");
@@ -537,7 +537,7 @@ public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
logger.debug("Transforming: LeftKeys={}, LeftRowType={}, RightKeys={}, RightRowType={}",
leftJoinKeys, leftRel.getRowType(), rightJoinKeys, right.getRowType());
RowKeyJoinRel rowKeyJoin = new RowKeyJoinRel(joinRel.getCluster(), joinRel.getTraitSet(), leftRel, right,
- joinCondition, joinRel.getJoinType());
+ joinCondition, joinRel.getJoinType(), joinRel instanceof DrillSemiJoinRel);
logger.info("Transforming: SUCCESS: Register runtime filter pushdown plan (rowkeyjoin)");
call.transformTo(rowKeyJoin);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
index 09e4be9..527b744 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
@@ -98,4 +98,9 @@ public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel {
return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
}
+
+ @Override
+ public boolean isSemiJoin() {
+ return true;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java
index b82e77c..abaf2ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java
@@ -34,7 +34,7 @@ public class RowKeyJoinCallContext {
private int rowKeyPos;
// swapping of row-key join inputs necessary
private boolean swapInputs;
- private DrillJoinRel joinRel;
+ private DrillJoin joinRel;
// rels on the rowkey side of the join to be transformed
private DrillProjectRel upperProjectRel;
private DrillFilterRel filterRel;
@@ -42,7 +42,7 @@ public class RowKeyJoinCallContext {
private DrillScanRel scanRel;
public RowKeyJoinCallContext (RelOptRuleCall call, RowKey rowKeyLoc, int rowKeyPos, boolean swapInputs,
- DrillJoinRel joinRel, DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel,
+ DrillJoin joinRel, DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel,
DrillScanRel scanRel) {
this.call = call;
this.rowKeyLoc = rowKeyLoc;
@@ -71,7 +71,7 @@ public class RowKeyJoinCallContext {
return swapInputs;
}
- public DrillJoinRel getJoinRel() {
+ public DrillJoin getJoinRel() {
return joinRel;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java
index 2f73526..1052029 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java
@@ -18,27 +18,43 @@
package org.apache.drill.exec.planner.logical;
+import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexChecker;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Litmus;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.logical.data.Join;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.exec.planner.torel.ConversionContext;
-
-import java.util.List;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
public class RowKeyJoinRel extends DrillJoinRel implements DrillRel {
+ /* Whether this join represents a semi-join. This is done to skip creating another logical join
+ * RowKeySemiJoinRel
+ */
+ boolean isSemiJoin;
+
public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
JoinRelType joinType) {
super(cluster, traits, left, right, condition, joinType);
}
public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, boolean isSemiJoin) {
+ super(cluster, traits, left, right, condition, joinType);
+ this.isSemiJoin = isSemiJoin;
+ }
+
+ public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
JoinRelType joinType, int joinControl) {
super(cluster, traits, left, right, condition, joinType, joinControl);
}
@@ -51,7 +67,7 @@ public class RowKeyJoinRel extends DrillJoinRel implements DrillRel {
@Override
public RowKeyJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType,
boolean semiJoinDone) {
- return new RowKeyJoinRel(getCluster(), traitSet, left, right, condition, joinType);
+ return new RowKeyJoinRel(getCluster(), traitSet, left, right, condition, joinType, isSemiJoin());
}
@Override
@@ -59,6 +75,25 @@ public class RowKeyJoinRel extends DrillJoinRel implements DrillRel {
return super.implement(implementor);
}
+ /**
+ * Returns whether this RowKeyJoin represents a {@link org.apache.calcite.rel.core.SemiJoin}
+ * @return true if join represents a {@link org.apache.calcite.rel.core.SemiJoin}, false otherwise.
+ */
+ public boolean isSemiJoin() {
+ return isSemiJoin;
+ }
+
+ @Override
+ public RelDataType deriveRowType() {
+ return SqlValidatorUtil.deriveJoinRowType(
+ left.getRowType(),
+ isSemiJoin() ? null : right.getRowType(),
+ JoinRelType.INNER,
+ getCluster().getTypeFactory(),
+ null,
+ ImmutableList.of());
+ }
+
public static RowKeyJoinRel convert(Join join, ConversionContext context) throws InvalidRelException {
Pair<RelNode, RelNode> inputs = getJoinInputs(join, context);
RexNode rexCondition = getJoinCondition(join, context);
@@ -66,4 +101,40 @@ public class RowKeyJoinRel extends DrillJoinRel implements DrillRel {
inputs.left, inputs.right, rexCondition, join.getJoinType());
return joinRel;
}
+
+ /** The parent method relies the class being an instance of {@link org.apache.calcite.rel.core.SemiJoin}
+ * in deciding row-type validity. We override this method to account for the RowKeyJoinRel logical rel
+ * representing both regular and semi-joins */
+ @Override public boolean isValid(Litmus litmus, Context context) {
+ if (getRowType().getFieldCount()
+ != getSystemFieldList().size()
+ + left.getRowType().getFieldCount()
+ + ((this.isSemiJoin()) ? 0 : right.getRowType().getFieldCount())) {
+ return litmus.fail("field count mismatch");
+ }
+ if (condition != null) {
+ if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
+ return litmus.fail("condition must be boolean: {}",
+ condition.getType());
+ }
+ // The input to the condition is a row type consisting of system
+ // fields, left fields, and right fields. Very similar to the
+ // output row type, except that fields have not yet been made due
+ // due to outer joins.
+ RexChecker checker =
+ new RexChecker(
+ getCluster().getTypeFactory().builder()
+ .addAll(getSystemFieldList())
+ .addAll(getLeft().getRowType().getFieldList())
+ .addAll(getRight().getRowType().getFieldList())
+ .build(),
+ context, litmus);
+ condition.accept(checker);
+ if (checker.getFailureCount() > 0) {
+ return litmus.fail(checker.getFailureCount()
+ + " failures in condition " + condition);
+ }
+ }
+ return litmus.succeed();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 0d7f5ca..9ba7ba4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -73,11 +73,11 @@ public class HashJoinPrule extends JoinPruleBase {
if(isDist){
createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN,
- left, right, null /* left collation */, null /* right collation */, hashSingleKey, isSemi);
+ left, right, null /* left collation */, null /* right collation */, hashSingleKey);
}else{
if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.HASH_JOIN,
- left, right, null /* left collation */, null /* right collation */, isSemi);
+ left, right, null /* left collation */, null /* right collation */);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index a589fcc..5678798 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -118,13 +118,14 @@ public abstract class JoinPruleBase extends Prule {
if (implementAsRowKeyJoin) {
newJoin = new RowKeyJoinPrel(join.getCluster(), traitsLeft,
convertedLeft, convertedRight, join.getCondition(),
- join.getJoinType());
+ join.getJoinType(), join.isSemiJoin());
} else {
newJoin = new HashJoinPrel(join.getCluster(), traitsLeft,
convertedLeft, convertedRight, join.getCondition(),
join.getJoinType(), false /* no swap */,
null /* no runtime filter */,
- true /* useful for join-restricted scans */, JoinControl.DEFAULT);
+ true /* useful for join-restricted scans */,
+ JoinControl.DEFAULT, join.isSemiJoin());
}
}
if (newJoin != null) {
@@ -136,7 +137,7 @@ public abstract class JoinPruleBase extends Prule {
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight,
- boolean hashSingleKey, boolean semiJoin)throws InvalidRelException {
+ boolean hashSingleKey)throws InvalidRelException {
/* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan:
* 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side.
@@ -153,7 +154,7 @@ public abstract class JoinPruleBase extends Prule {
DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
- createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
+ createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
assert (join.getLeftKeys().size() == join.getRightKeys().size());
@@ -167,7 +168,7 @@ public abstract class JoinPruleBase extends Prule {
hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1))));
hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1))));
- createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
+ createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
}
}
}
@@ -179,7 +180,7 @@ public abstract class JoinPruleBase extends Prule {
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight,
- DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition, boolean isSemiJoin) throws InvalidRelException {
+ DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition) throws InvalidRelException {
RelTraitSet traitsLeft = null;
RelTraitSet traitsRight = null;
@@ -202,12 +203,12 @@ public abstract class JoinPruleBase extends Prule {
final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call);
newJoin = new HashJoinPrel(join.getCluster(), traitSet,
convertedLeft, convertedRight, join.getCondition(),
- join.getJoinType(), isSemiJoin);
+ join.getJoinType(), join.isSemiJoin());
} else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft,
convertedLeft, convertedRight, join.getCondition(),
- join.getJoinType());
+ join.getJoinType(), join.isSemiJoin());
}
call.transformTo(newJoin);
}
@@ -219,7 +220,7 @@ public abstract class JoinPruleBase extends Prule {
final RexNode joinCondition,
final PhysicalJoinType physicalJoinType,
final RelNode left, final RelNode right,
- final RelCollation collationLeft, final RelCollation collationRight, boolean semiJoin) throws InvalidRelException {
+ final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException {
DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
RelTraitSet traitsRight = null;
@@ -265,7 +266,7 @@ public abstract class JoinPruleBase extends Prule {
RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
RelNode newLeft = convert(left, newTraitsLeft);
return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
- join.getJoinType(), semiJoin);
+ join.getJoinType(), join.isSemiJoin());
}
@@ -288,11 +289,11 @@ public abstract class JoinPruleBase extends Prule {
} else {
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
- convertedRight, joinCondition, join.getJoinType()));
+ convertedRight, joinCondition, join.getJoinType(), join.isSemiJoin()));
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, convertedLeft,
- convertedRight, joinCondition, join.getJoinType(), semiJoin));
+ convertedRight, joinCondition, join.getJoinType(), join.isSemiJoin()));
} else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
convertedRight, joinCondition, join.getJoinType()));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index b293c47..195abbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -50,6 +50,11 @@ public class MergeJoinPrel extends JoinPrel {
joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys, filterNulls);
}
+ public MergeJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, boolean semijoin) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType, semijoin);
+ joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys, filterNulls);
+ }
@Override
public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index 0bd2568..f06b66d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -70,11 +70,11 @@ public class MergeJoinPrule extends JoinPruleBase {
RelCollation collationRight = getCollation(join.getRightKeys());
if(isDist){
- createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey, false);
+ createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
}else{
if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.MERGE_JOIN,
- left, right, collationLeft, collationRight, false);
+ left, right, collationLeft, collationRight);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
index b184eab..283b36d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
@@ -46,6 +46,11 @@ public class NestedLoopJoinPrel extends JoinPrel {
super(cluster, traits, left, right, condition, joinType);
}
+ public NestedLoopJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, boolean semijoin) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType, semijoin);
+ }
+
@Override
public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
index e7fc032..6caea82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -94,7 +94,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.NESTEDLOOP_JOIN,
- left, right, null /* left collation */, null /* right collation */, false);
+ left, right, null /* left collation */, null /* right collation */);
}
} catch (InvalidRelException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
index 7e8f77e..54c2230 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
@@ -47,6 +47,12 @@ public class RowKeyJoinPrel extends JoinPrel implements Prel {
Preconditions.checkArgument(joinType == JoinRelType.INNER);
}
+ public RowKeyJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ RexNode condition, JoinRelType joinType, boolean isSemiJoin) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType, isSemiJoin);
+ Preconditions.checkArgument(joinType == JoinRelType.INNER);
+ }
+
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
@@ -67,7 +73,8 @@ public class RowKeyJoinPrel extends JoinPrel implements Prel {
public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right,
JoinRelType joinType, boolean semiJoinDone) {
try {
- RowKeyJoinPrel rkj = new RowKeyJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType);
+ RowKeyJoinPrel rkj = new RowKeyJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr,
+ joinType, isSemiJoin());
rkj.setEstimatedRowCount(this.estimatedRowCount);
return rkj;
} catch (InvalidRelException e) {