You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/01/24 06:15:25 UTC

[5/5] drill git commit: DRILL-5164: Equi-join query results in CompileException when inputs have large number of columns.

DRILL-5164: Equi-join query results in CompileException when inputs have large number of columns.

close apache/drill#711


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2af709f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2af709f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2af709f4

Branch: refs/heads/master
Commit: 2af709f43d01f341b2a52c6473ea49d6761fdc61
Parents: 8217697
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Tue Dec 27 16:20:37 2016 +0000
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Jan 23 17:08:58 2017 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/HashJoinBatch.java  |  4 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |  2 +
 .../physical/impl/join/NestedLoopJoinBatch.java |  4 +-
 .../exec/compile/TestLargeFileCompilation.java  | 94 +++++++++++++++++---
 4 files changed, 87 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 23741b0..f1f81fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -439,7 +439,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
             .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
             .arg(outIndex)
             .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
-
+        g.rotateBlock();
         fieldId++;
       }
     }
@@ -475,7 +475,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         final JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
 
         g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
-
+        g.rotateBlock();
         fieldId++;
         outputFieldId++;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index a9bb479..c351517 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -345,6 +345,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           .arg(copyLeftMapping.getValueReadIndex())
           .arg(copyLeftMapping.getValueWriteIndex())
           .arg(vvIn));
+        cg.rotateBlock();
         ++vectorId;
       }
     }
@@ -373,6 +374,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           .arg(copyRightMappping.getValueReadIndex())
           .arg(copyRightMappping.getValueWriteIndex())
           .arg(vvIn));
+        cg.rotateBlock();
         ++vectorId;
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 2e92c8d..bdd9f0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -247,7 +247,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
       JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(fieldType, false, outputFieldId));
 
       nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
-
+      nLJClassGenerator.rotateBlock();
       fieldId++;
       outputFieldId++;
     }
@@ -270,7 +270,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
           .arg(recordIndexWithinBatch)
           .arg(outIndex)
           .arg(inVV.component(batchIndex)));
-
+      nLJClassGenerator.rotateBlock();
       fieldId++;
       outputFieldId++;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/2af709f4/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 35bd4c9..8416d73 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -39,27 +39,33 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   private static final String LARGE_QUERY_SELECT_LIST;
 
+  private static final String QUERY_WITH_JOIN;
+
+  private static final String LARGE_TABLE_WRITER;
+
   private static final int ITERATION_COUNT = Integer.valueOf(System.getProperty("TestLargeFileCompilation.iteration", "1"));
 
-  private static final int NUM_PROJECT_COULMNS = 2000;
+  private static final int NUM_PROJECT_COLUMNS = 2000;
+
+  private static final int NUM_ORDERBY_COLUMNS = 500;
 
-  private static final int NUM_ORDERBY_COULMNS = 500;
+  private static final int NUM_GROUPBY_COLUMNS = 225;
 
-  private static final int NUM_GROUPBY_COULMNS = 225;
+  private static final int NUM_FILTER_COLUMNS = 150;
 
-  private static final int NUM_FILTER_COULMNS = 150;
+  private static final int NUM_JOIN_TABLE_COLUMNS = 500;
 
   static {
     StringBuilder sb = new StringBuilder("select\n\t");
-    for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
       sb.append("c").append(i).append(", ");
     }
     sb.append("full_name\nfrom (select\n\t");
-    for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
       sb.append("employee_id+").append(i).append(" as c").append(i).append(", ");
     }
     sb.append("full_name\nfrom cp.`employee.json`)\ngroup by\n\t");
-    for (int i = 0; i < NUM_GROUPBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_GROUPBY_COLUMNS; i++) {
       sb.append("c").append(i).append(", ");
     }
     LARGE_QUERY_GROUP_BY = sb.append("full_name").toString();
@@ -67,7 +73,7 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   static {
     StringBuilder sb = new StringBuilder("select\n\t");
-    for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+    for (int i = 0; i < NUM_PROJECT_COLUMNS; i++) {
       sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
     }
     sb.append("full_name\nfrom cp.`employee.json`\n\n\t");
@@ -76,11 +82,11 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   static {
     StringBuilder sb = new StringBuilder("select\n\t");
-    for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+    for (int i = 0; i < NUM_PROJECT_COLUMNS; i++) {
       sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
     }
     sb.append("full_name\nfrom cp.`employee.json`\norder by\n\t");
-    for (int i = 0; i < NUM_ORDERBY_COULMNS; i++) {
+    for (int i = 0; i < NUM_ORDERBY_COLUMNS; i++) {
       sb.append(" col").append(i).append(", ");
     }
     LARGE_QUERY_ORDER_BY = sb.append("full_name").toString();
@@ -91,18 +97,24 @@ public class TestLargeFileCompilation extends BaseTestQuery {
     StringBuilder sb = new StringBuilder("select *\n")
       .append("from cp.`employee.json`\n")
       .append("where");
-    for (int i = 0; i < NUM_FILTER_COULMNS; i++) {
+    for (int i = 0; i < NUM_FILTER_COLUMNS; i++) {
       sb.append(" employee_id+").append(i).append(" < employee_id ").append(i%2==0?"OR":"AND");
     }
     LARGE_QUERY_FILTER = sb.append(" true") .toString();
   }
 
   static {
+    LARGE_QUERY_WRITER = createTableWithColsCount(NUM_PROJECT_COLUMNS);
+    LARGE_TABLE_WRITER = createTableWithColsCount(NUM_JOIN_TABLE_COLUMNS);
+    QUERY_WITH_JOIN = "select * from %1$s t1, %1$s t2 where t1.col1 = t2.col1";
+  }
+
+  private static String createTableWithColsCount(int columnsCount) {
     StringBuilder sb = new StringBuilder("create table %s as (select \n");
-    for (int i = 0; i < NUM_PROJECT_COULMNS; i++) {
+    for (int i = 0; i < columnsCount; i++) {
       sb.append("employee_id+").append(i).append(" as col").append(i).append(", ");
     }
-    LARGE_QUERY_WRITER = sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
+    return sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString();
   }
 
   @Test
@@ -150,4 +162,60 @@ public class TestLargeFileCompilation extends BaseTestQuery {
     testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
     testNoResult(ITERATION_COUNT, LARGE_QUERY_SELECT_LIST);
   }
+
+  @Test
+  public void testHashJoin() throws Exception {
+    String tableName = "wide_table_hash_join";
+    try {
+      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("alter session set `planner.enable_mergejoin` = false");
+      testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
+      testNoResult("use dfs_test.tmp");
+      testNoResult(LARGE_TABLE_WRITER, tableName);
+      testNoResult(QUERY_WITH_JOIN, tableName);
+    } finally {
+      testNoResult("alter session reset `planner.enable_mergejoin`");
+      testNoResult("alter session reset `planner.enable_nestedloopjoin`");
+      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("drop table if exists %s", tableName);
+    }
+  }
+
+  @Test
+  public void testMergeJoin() throws Exception {
+    String tableName = "wide_table_merge_join";
+    try {
+      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("alter session set `planner.enable_hashjoin` = false");
+      testNoResult("alter session set `planner.enable_nestedloopjoin` = false");
+      testNoResult("use dfs_test.tmp");
+      testNoResult(LARGE_TABLE_WRITER, tableName);
+      testNoResult(QUERY_WITH_JOIN, tableName);
+    } finally {
+      testNoResult("alter session reset `planner.enable_hashjoin`");
+      testNoResult("alter session reset `planner.enable_nestedloopjoin`");
+      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("drop table if exists %s", tableName);
+    }
+  }
+
+  @Test
+  public void testNestedLoopJoin() throws Exception {
+    String tableName = "wide_table_loop_join";
+    try {
+      testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("alter session set `planner.enable_nljoin_for_scalar_only` = false");
+      testNoResult("alter session set `planner.enable_hashjoin` = false");
+      testNoResult("alter session set `planner.enable_mergejoin` = false");
+      testNoResult("use dfs_test.tmp");
+      testNoResult(LARGE_TABLE_WRITER, tableName);
+      testNoResult(QUERY_WITH_JOIN, tableName);
+    } finally {
+      testNoResult("alter session reset `planner.enable_nljoin_for_scalar_only`");
+      testNoResult("alter session reset `planner.enable_hashjoin`");
+      testNoResult("alter session reset `planner.enable_mergejoin`");
+      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      testNoResult("drop table if exists %s", tableName);
+    }
+  }
 }