You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/06/22 21:14:34 UTC
[drill] 03/09: DRILL-6491: Prevent merge join for full outer join
at planning stage
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 9299fcc29dd9ac320b62b986f74423e8e6d1b750
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Tue Jun 12 18:58:24 2018 +0300
DRILL-6491: Prevent merge join for full outer join at planning stage
closes #1320
---
.../drill/exec/planner/physical/MergeJoinPrel.java | 3 +-
.../physical/impl/join/TestMergeJoinAdvanced.java | 160 +++++++++++++--------
2 files changed, 99 insertions(+), 64 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index 52e8921..2928be2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -65,7 +65,8 @@ public class MergeJoinPrel extends JoinPrel {
if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
return super.computeSelfCost(planner, mq).multiplyBy(.1);
}
- if (joincategory == JoinCategory.CARTESIAN || joincategory == JoinCategory.INEQUALITY) {
+ if (joincategory == JoinCategory.CARTESIAN || joincategory == JoinCategory.INEQUALITY
+ || getJoinType() == JoinRelType.FULL) {
return planner.getCostFactory().makeInfiniteCost();
}
double leftRowCount = mq.getRowCount(this.getLeft());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
index 147323d..d8bdea9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
@@ -17,16 +17,20 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.test.TestTools;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.junit.AfterClass;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import java.io.BufferedWriter;
@@ -48,67 +52,69 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
@Rule
public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual.
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
- // Have to disable hash join to test merge join in this class
@BeforeClass
- public static void enableMergeJoin() throws Exception {
- test(DISABLE_HJ);
-
+ public static void init() {
leftFile = new File(dirTestWatcher.getRootDir(), LEFT);
rightFile = new File(dirTestWatcher.getRootDir(), RIGHT);
dirTestWatcher.copyResourceToRoot(Paths.get("join"));
}
- @AfterClass
- public static void disableMergeJoin() throws Exception {
- test(ENABLE_HJ);
+ // Have to disable hash join to test merge join in this class
+ @Before
+ public void disableHashJoin() throws Exception {
+ test(DISABLE_HJ);
+ }
+
+ @After
+ public void enableHashJoin() throws Exception {
+ test(RESET_HJ);
}
@Test
public void testJoinWithDifferentTypesInCondition() throws Exception {
- String query = "select t1.full_name from cp.`employee.json` t1, cp.`department.json` t2 " +
- "where cast(t1.department_id as double) = t2.department_id and t1.employee_id = 1";
-
- testBuilder()
- .sqlQuery(query)
- .optionSettingQueriesForTestQuery(ENABLE_HJ)
- .unOrdered()
- .baselineColumns("full_name")
- .baselineValues("Sheri Nowmer")
- .go();
-
-
- query = "select t1.bigint_col from cp.`jsoninput/implicit_cast_join_1.json` t1, cp.`jsoninput/implicit_cast_join_1.json` t2 " +
- " where t1.bigint_col = cast(t2.bigint_col as int) and" + // join condition with bigint and int
- " t1.double_col = cast(t2.double_col as float) and" + // join condition with double and float
- " t1.bigint_col = cast(t2.bigint_col as double)"; // join condition with bigint and double
-
- testBuilder()
- .sqlQuery(query)
- .optionSettingQueriesForTestQuery(ENABLE_HJ)
- .unOrdered()
- .baselineColumns("bigint_col")
- .baselineValues(1l)
- .go();
-
- query = "select count(*) col1 from " +
+ String query = "select count(*) col1 from " +
"(select t1.date_opt from cp.`parquet/date_dictionary.parquet` t1, cp.`parquet/timestamp_table.parquet` t2 " +
"where t1.date_opt = t2.timestamp_col)"; // join condition contains date and timestamp
-
- testBuilder()
- .sqlQuery(query)
+ testBuilder().sqlQuery(query)
.unOrdered()
.baselineColumns("col1")
- .baselineValues(4l)
+ .baselineValues(4L)
.go();
+
+ try {
+ test(ENABLE_HJ);
+
+ query = "select t1.full_name from cp.`employee.json` t1, cp.`department.json` t2 " +
+ "where cast(t1.department_id as double) = t2.department_id and t1.employee_id = 1";
+ testBuilder().sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("full_name")
+ .baselineValues("Sheri Nowmer")
+ .go();
+
+
+ query = "select t1.bigint_col from cp.`jsoninput/implicit_cast_join_1.json` t1, cp.`jsoninput/implicit_cast_join_1.json` t2 " +
+ " where t1.bigint_col = cast(t2.bigint_col as int) and" + // join condition with bigint and int
+ " t1.double_col = cast(t2.double_col as float) and" + // join condition with double and float
+ " t1.bigint_col = cast(t2.bigint_col as double)"; // join condition with bigint and double
+ testBuilder().sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("bigint_col")
+ .baselineValues(1L)
+ .go();
+ } finally {
+ test(RESET_HJ);
+ }
}
@Test
@Ignore // TODO file JIRA to fix this
public void testFix2967() throws Exception {
setSessionOption(PlannerSettings.BROADCAST.getOptionName(), "false");
- setSessionOption(PlannerSettings.HASHJOIN.getOptionName(), "false");
setSessionOption(ExecConstants.SLICE_TARGET, "1");
setSessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, "23");
@@ -143,11 +149,10 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
final BufferedWriter leftWriter = new BufferedWriter(new FileWriter(leftFile));
final BufferedWriter rightWriter = new BufferedWriter(new FileWriter(rightFile));
generateData(leftWriter, rightWriter, left, right);
- final String query1 = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
+ final String query = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
LEFT, joinType, RIGHT);
testBuilder()
- .sqlQuery(query1)
- .optionSettingQueriesForTestQuery(DISABLE_HJ)
+ .sqlQuery(query)
.unOrdered()
.baselineColumns("c1")
.baselineValues(expected)
@@ -156,32 +161,32 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
@Test
public void testMergeInnerJoinLargeRight() throws Exception {
- testMultipleBatchJoin(1000l, 5000l, "inner", 5000l * 1000l);
+ testMultipleBatchJoin(1000L, 5000L, "inner", 5000L * 1000L);
}
@Test
public void testMergeLeftJoinLargeRight() throws Exception {
- testMultipleBatchJoin(1000l, 5000l, "left", 5000l * 1000l +2l);
+ testMultipleBatchJoin(1000L, 5000L, "left", 5000L * 1000L +2L);
}
@Test
public void testMergeRightJoinLargeRight() throws Exception {
- testMultipleBatchJoin(1000l, 5000l, "right", 5000l*1000l +3l);
+ testMultipleBatchJoin(1000L, 5000L, "right", 5000L * 1000L +3L);
}
@Test
public void testMergeInnerJoinLargeLeft() throws Exception {
- testMultipleBatchJoin(5000l, 1000l, "inner", 5000l*1000l);
+ testMultipleBatchJoin(5000L, 1000L, "inner", 5000L * 1000L);
}
@Test
public void testMergeLeftJoinLargeLeft() throws Exception {
- testMultipleBatchJoin(5000l, 1000l, "left", 5000l*1000l + 2l);
+ testMultipleBatchJoin(5000L, 1000L, "left", 5000L * 1000L + 2L);
}
@Test
public void testMergeRightJoinLargeLeft() throws Exception {
- testMultipleBatchJoin(5000l, 1000l, "right", 5000l*1000l + 3l);
+ testMultipleBatchJoin(5000L, 1000L, "right", 5000L * 1000L + 3L);
}
// Following tests can take some time.
@@ -189,37 +194,38 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
@Ignore
public void testMergeInnerJoinRandomized() throws Exception {
final Random r = new Random();
- final long right = r.nextInt(10001) + 1l;
- final long left = r.nextInt(10001) + 1l;
- testMultipleBatchJoin(left, right, "inner", left*right);
+ final long right = r.nextInt(10001) + 1L;
+ final long left = r.nextInt(10001) + 1L;
+ testMultipleBatchJoin(left, right, "inner", left * right);
}
@Test
@Ignore
public void testMergeLeftJoinRandomized() throws Exception {
final Random r = new Random();
- final long right = r.nextInt(10001) + 1l;
- final long left = r.nextInt(10001) + 1l;
- testMultipleBatchJoin(left, right, "left", left*right + 2l);
+ final long right = r.nextInt(10001) + 1L;
+ final long left = r.nextInt(10001) + 1L;
+ testMultipleBatchJoin(left, right, "left", left * right + 2L);
}
@Test
@Ignore
public void testMergeRightJoinRandomized() throws Exception {
final Random r = new Random();
- final long right = r.nextInt(10001) + 1l;
- final long left = r.nextInt(10001) + 1l;
- testMultipleBatchJoin(left, right, "right", left * right + 3l);
+ final long right = r.nextInt(10001) + 1L;
+ final long left = r.nextInt(10001) + 1L;
+ testMultipleBatchJoin(left, right, "right", left * right + 3L);
}
@Test
public void testDrill4165() throws Exception {
- final String query1 = "select count(*) cnt from cp.`tpch/lineitem.parquet` l1, cp.`tpch/lineitem.parquet` l2 where l1.l_partkey = l2.l_partkey and l1.l_suppkey < 30 and l2.l_suppkey < 30";
+ final String query = "select count(*) cnt from cp.`tpch/lineitem.parquet` l1, cp.`tpch/lineitem.parquet` l2 " +
+ "where l1.l_partkey = l2.l_partkey and l1.l_suppkey < 30 and l2.l_suppkey < 30";
testBuilder()
- .sqlQuery(query1)
+ .sqlQuery(query)
.unOrdered()
.baselineColumns("cnt")
- .baselineValues(202452l)
+ .baselineValues(202452L)
.go();
}
@@ -244,11 +250,10 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
leftWriter.close();
rightWriter.close();
- final String query1 = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
+ final String query = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
LEFT, "inner", RIGHT);
testBuilder()
- .sqlQuery(query1)
- .optionSettingQueriesForTestQuery(DISABLE_HJ)
+ .sqlQuery(query)
.unOrdered()
.baselineColumns("c1")
.baselineValues(6000*800L)
@@ -269,4 +274,33 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
public void testMergeRightJoinWithEmptyTable() throws Exception {
testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", new String[] {MJ_PATTERN, RIGHT_JOIN_TYPE}, 0L);
}
+
+ @Test // DRILL-6491
+ public void testMergeIsNotSelectedForFullJoin() throws Exception {
+ try {
+ test(ENABLE_HJ);
+
+ String query = "select * " +
+ " from (select employee_id from cp.`employee.json` order by employee_id) e1 " +
+ " full outer join (select employee_id from cp.`employee.json` order by employee_id) e2 " +
+ " on e1.employee_id = e2.employee_id " +
+ " limit 10";
+ testPlanMatchingPatterns(query, null, new String[]{MJ_PATTERN});
+ } finally {
+ test(RESET_HJ);
+ }
+ }
+
+ @Test // DRILL-6491
+ public void testFullJoinIsNotSupported() throws Exception {
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage(CoreMatchers.containsString("SYSTEM ERROR: CannotPlanException"));
+
+ String query = "select * " +
+ " from (select employee_id from cp.`employee.json` order by employee_id) e1 " +
+ " full outer join (select employee_id from cp.`employee.json` order by employee_id) e2 " +
+ " on e1.employee_id = e2.employee_id " +
+ " limit 10";
+ test(query);
+ }
}