You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/06/22 21:14:34 UTC

[drill] 03/09: DRILL-6491: Prevent merge join for full outer join at planning stage

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 9299fcc29dd9ac320b62b986f74423e8e6d1b750
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Tue Jun 12 18:58:24 2018 +0300

    DRILL-6491: Prevent merge join for full outer join at planning stage
    
    closes #1320
---
 .../drill/exec/planner/physical/MergeJoinPrel.java |   3 +-
 .../physical/impl/join/TestMergeJoinAdvanced.java  | 160 +++++++++++++--------
 2 files changed, 99 insertions(+), 64 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index 52e8921..2928be2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -65,7 +65,8 @@ public class MergeJoinPrel  extends JoinPrel {
     if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
       return super.computeSelfCost(planner, mq).multiplyBy(.1);
     }
-    if (joincategory == JoinCategory.CARTESIAN || joincategory == JoinCategory.INEQUALITY) {
+    if (joincategory == JoinCategory.CARTESIAN || joincategory == JoinCategory.INEQUALITY
+        || getJoinType() == JoinRelType.FULL) {
       return planner.getCostFactory().makeInfiniteCost();
     }
     double leftRowCount = mq.getRowCount(this.getLeft());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
index 147323d..d8bdea9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
@@ -17,16 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.test.TestTools;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.junit.AfterClass;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.Ignore;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TestRule;
 
 import java.io.BufferedWriter;
@@ -48,67 +52,69 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
 
   @Rule
   public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual.
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
 
-  // Have to disable hash join to test merge join in this class
   @BeforeClass
-  public static void enableMergeJoin() throws Exception {
-    test(DISABLE_HJ);
-
+  public static void init() {
     leftFile = new File(dirTestWatcher.getRootDir(), LEFT);
     rightFile = new File(dirTestWatcher.getRootDir(), RIGHT);
 
     dirTestWatcher.copyResourceToRoot(Paths.get("join"));
   }
 
-  @AfterClass
-  public static void disableMergeJoin() throws Exception {
-    test(ENABLE_HJ);
+  // Have to disable hash join to test merge join in this class
+  @Before
+  public void disableHashJoin() throws Exception {
+    test(DISABLE_HJ);
+  }
+
+  @After
+  public void enableHashJoin() throws Exception {
+    test(RESET_HJ);
   }
 
   @Test
   public void testJoinWithDifferentTypesInCondition() throws Exception {
-    String query = "select t1.full_name from cp.`employee.json` t1, cp.`department.json` t2 " +
-        "where cast(t1.department_id as double) = t2.department_id and t1.employee_id = 1";
-
-    testBuilder()
-        .sqlQuery(query)
-        .optionSettingQueriesForTestQuery(ENABLE_HJ)
-        .unOrdered()
-        .baselineColumns("full_name")
-        .baselineValues("Sheri Nowmer")
-        .go();
-
-
-    query = "select t1.bigint_col from cp.`jsoninput/implicit_cast_join_1.json` t1, cp.`jsoninput/implicit_cast_join_1.json` t2 " +
-        " where t1.bigint_col = cast(t2.bigint_col as int) and" + // join condition with bigint and int
-        " t1.double_col  = cast(t2.double_col as float) and" + // join condition with double and float
-        " t1.bigint_col = cast(t2.bigint_col as double)"; // join condition with bigint and double
-
-    testBuilder()
-        .sqlQuery(query)
-        .optionSettingQueriesForTestQuery(ENABLE_HJ)
-        .unOrdered()
-        .baselineColumns("bigint_col")
-        .baselineValues(1l)
-        .go();
-
-    query = "select count(*) col1 from " +
+    String query = "select count(*) col1 from " +
         "(select t1.date_opt from cp.`parquet/date_dictionary.parquet` t1, cp.`parquet/timestamp_table.parquet` t2 " +
         "where t1.date_opt = t2.timestamp_col)"; // join condition contains date and timestamp
-
-    testBuilder()
-        .sqlQuery(query)
+    testBuilder().sqlQuery(query)
         .unOrdered()
         .baselineColumns("col1")
-        .baselineValues(4l)
+        .baselineValues(4L)
         .go();
+
+    try {
+      test(ENABLE_HJ);
+
+      query = "select t1.full_name from cp.`employee.json` t1, cp.`department.json` t2 " +
+          "where cast(t1.department_id as double) = t2.department_id and t1.employee_id = 1";
+      testBuilder().sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("full_name")
+          .baselineValues("Sheri Nowmer")
+          .go();
+
+
+      query = "select t1.bigint_col from cp.`jsoninput/implicit_cast_join_1.json` t1, cp.`jsoninput/implicit_cast_join_1.json` t2 " +
+          " where t1.bigint_col = cast(t2.bigint_col as int) and" + // join condition with bigint and int
+          " t1.double_col  = cast(t2.double_col as float) and" + // join condition with double and float
+          " t1.bigint_col = cast(t2.bigint_col as double)"; // join condition with bigint and double
+      testBuilder().sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("bigint_col")
+          .baselineValues(1L)
+          .go();
+    } finally {
+      test(RESET_HJ);
+    }
   }
 
   @Test
   @Ignore // TODO file JIRA to fix this
   public void testFix2967() throws Exception {
     setSessionOption(PlannerSettings.BROADCAST.getOptionName(), "false");
-    setSessionOption(PlannerSettings.HASHJOIN.getOptionName(), "false");
     setSessionOption(ExecConstants.SLICE_TARGET, "1");
     setSessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, "23");
 
@@ -143,11 +149,10 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
     final BufferedWriter leftWriter = new BufferedWriter(new FileWriter(leftFile));
     final BufferedWriter rightWriter = new BufferedWriter(new FileWriter(rightFile));
     generateData(leftWriter, rightWriter, left, right);
-    final String query1 = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
+    final String query = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
       LEFT, joinType, RIGHT);
     testBuilder()
-      .sqlQuery(query1)
-      .optionSettingQueriesForTestQuery(DISABLE_HJ)
+      .sqlQuery(query)
       .unOrdered()
       .baselineColumns("c1")
       .baselineValues(expected)
@@ -156,32 +161,32 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
 
   @Test
   public void testMergeInnerJoinLargeRight() throws Exception {
-    testMultipleBatchJoin(1000l, 5000l, "inner", 5000l * 1000l);
+    testMultipleBatchJoin(1000L, 5000L, "inner", 5000L * 1000L);
   }
 
   @Test
   public void testMergeLeftJoinLargeRight() throws Exception {
-    testMultipleBatchJoin(1000l, 5000l, "left", 5000l * 1000l +2l);
+    testMultipleBatchJoin(1000L, 5000L, "left", 5000L * 1000L +2L);
   }
 
   @Test
   public void testMergeRightJoinLargeRight() throws Exception {
-    testMultipleBatchJoin(1000l, 5000l, "right", 5000l*1000l +3l);
+    testMultipleBatchJoin(1000L, 5000L, "right", 5000L * 1000L +3L);
   }
 
   @Test
   public void testMergeInnerJoinLargeLeft() throws Exception {
-    testMultipleBatchJoin(5000l, 1000l, "inner", 5000l*1000l);
+    testMultipleBatchJoin(5000L, 1000L, "inner", 5000L * 1000L);
   }
 
   @Test
   public void testMergeLeftJoinLargeLeft() throws Exception {
-    testMultipleBatchJoin(5000l, 1000l, "left", 5000l*1000l + 2l);
+    testMultipleBatchJoin(5000L, 1000L, "left", 5000L * 1000L + 2L);
   }
 
   @Test
   public void testMergeRightJoinLargeLeft() throws Exception {
-    testMultipleBatchJoin(5000l, 1000l, "right", 5000l*1000l + 3l);
+    testMultipleBatchJoin(5000L, 1000L, "right", 5000L * 1000L + 3L);
   }
 
   // Following tests can take some time.
@@ -189,37 +194,38 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
   @Ignore
   public void testMergeInnerJoinRandomized() throws Exception {
     final Random r = new Random();
-    final long right = r.nextInt(10001) + 1l;
-    final long left = r.nextInt(10001) + 1l;
-    testMultipleBatchJoin(left, right, "inner", left*right);
+    final long right = r.nextInt(10001) + 1L;
+    final long left = r.nextInt(10001) + 1L;
+    testMultipleBatchJoin(left, right, "inner", left * right);
   }
 
   @Test
   @Ignore
   public void testMergeLeftJoinRandomized() throws Exception {
     final Random r = new Random();
-    final long right = r.nextInt(10001) + 1l;
-    final long left = r.nextInt(10001) + 1l;
-    testMultipleBatchJoin(left, right, "left", left*right + 2l);
+    final long right = r.nextInt(10001) + 1L;
+    final long left = r.nextInt(10001) + 1L;
+    testMultipleBatchJoin(left, right, "left", left * right + 2L);
   }
 
   @Test
   @Ignore
   public void testMergeRightJoinRandomized() throws Exception {
     final Random r = new Random();
-    final long right = r.nextInt(10001) + 1l;
-    final long left = r.nextInt(10001) + 1l;
-    testMultipleBatchJoin(left, right, "right", left * right + 3l);
+    final long right = r.nextInt(10001) + 1L;
+    final long left = r.nextInt(10001) + 1L;
+    testMultipleBatchJoin(left, right, "right", left * right + 3L);
   }
 
   @Test
   public void testDrill4165() throws Exception {
-    final String query1 = "select count(*) cnt from cp.`tpch/lineitem.parquet` l1, cp.`tpch/lineitem.parquet` l2 where l1.l_partkey = l2.l_partkey and l1.l_suppkey < 30 and l2.l_suppkey < 30";
+    final String query = "select count(*) cnt from cp.`tpch/lineitem.parquet` l1, cp.`tpch/lineitem.parquet` l2 " +
+        "where l1.l_partkey = l2.l_partkey and l1.l_suppkey < 30 and l2.l_suppkey < 30";
     testBuilder()
-      .sqlQuery(query1)
+      .sqlQuery(query)
       .unOrdered()
       .baselineColumns("cnt")
-      .baselineValues(202452l)
+      .baselineValues(202452L)
       .go();
   }
 
@@ -244,11 +250,10 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
     leftWriter.close();
     rightWriter.close();
 
-    final String query1 = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
+    final String query = String.format("select count(*) c1 from dfs.`%s` L %s join dfs.`%s` R on L.k=R.k1",
       LEFT, "inner", RIGHT);
     testBuilder()
-      .sqlQuery(query1)
-      .optionSettingQueriesForTestQuery(DISABLE_HJ)
+      .sqlQuery(query)
       .unOrdered()
       .baselineColumns("c1")
       .baselineValues(6000*800L)
@@ -269,4 +274,33 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
   public void testMergeRightJoinWithEmptyTable() throws Exception {
     testJoinWithEmptyFile(dirTestWatcher.getRootDir(), "right outer", new String[] {MJ_PATTERN, RIGHT_JOIN_TYPE}, 0L);
   }
+
+  @Test // DRILL-6491
+  public void testMergeIsNotSelectedForFullJoin() throws Exception {
+    try {
+      test(ENABLE_HJ);
+
+      String query = "select * " +
+          " from (select employee_id from cp.`employee.json` order by employee_id) e1 " +
+          " full outer join (select employee_id from cp.`employee.json` order by employee_id) e2 " +
+          " on e1.employee_id = e2.employee_id " +
+          " limit 10";
+      testPlanMatchingPatterns(query, null, new String[]{MJ_PATTERN});
+    } finally {
+      test(RESET_HJ);
+    }
+  }
+
+  @Test // DRILL-6491
+  public void testFullJoinIsNotSupported() throws Exception {
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(CoreMatchers.containsString("SYSTEM ERROR: CannotPlanException"));
+
+    String query = "select * " +
+        " from (select employee_id from cp.`employee.json` order by employee_id) e1 " +
+        " full outer join (select employee_id from cp.`employee.json` order by employee_id) e2 " +
+        " on e1.employee_id = e2.employee_id " +
+        " limit 10";
+    test(query);
+  }
 }