You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/01 07:58:57 UTC

[04/14] git commit: TAJO-968: Self-Join query (including partitioned table) doesn't run unexpectedly using auto broad cast join. (jaehwa)

TAJO-968: Self-Join query (including partitioned table) doesn't run unexpectedly using auto broad cast join. (jaehwa)

Closes #88


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72808e06
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72808e06
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72808e06

Branch: refs/heads/index_support
Commit: 72808e06f02cbb0bd7d9cf345544c60205cf34b0
Parents: 326be45
Author: blrunner <bl...@apache.org>
Authored: Thu Jul 24 11:27:18 2014 +0900
Committer: blrunner <bl...@apache.org>
Committed: Thu Jul 24 11:27:18 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../apache/tajo/worker/TaskAttemptContext.java  | 26 +++++++++++-
 .../tajo/engine/query/TestJoinBroadcast.java    | 42 +++++++++++++++++---
 3 files changed, 64 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a67625d..08cf60a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -97,6 +97,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-968: Self-Join query (including partitioned table) doesn't run unexpectedly 
+    using auto broadcast join. (jaewha)
+
     TAJO-914: join queries with constant values can cause schema mismatch in
     logical plan. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 1f0c410..db4af45 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -31,8 +31,10 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.TUtil;
 
 import java.io.File;
 import java.util.*;
@@ -234,10 +236,30 @@ public class TaskAttemptContext {
       tableFragments = new ArrayList<FragmentProto>();
     }
 
+    List<Path> paths = fragmentToPath(tableFragments);
+
     for (FragmentProto eachFragment: fragments) {
-      tableFragments.add(eachFragment);
+      FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
+      // If current attempt already has same path, we don't need to add it to fragments.
+      if (!paths.contains(fileFragment.getPath())) {
+        tableFragments.add(eachFragment);
+      }
     }
-    fragmentMap.put(tableId, tableFragments);
+
+    if (tableFragments.size() > 0) {
+      fragmentMap.put(tableId, tableFragments);
+    }
+  }
+
+  private List<Path> fragmentToPath(List<FragmentProto> tableFragments) {
+    List<Path> list = TUtil.newList();
+
+    for (FragmentProto proto : tableFragments) {
+      FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+      list.add(fragment.getPath());
+    }
+
+    return list;
   }
 
   public Path getWorkDir() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 9cc65bc..5df6f24 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.engine.query;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.*;
 import org.apache.tajo.catalog.*;
@@ -43,13 +41,11 @@ import org.junit.experimental.categories.Category;
 import java.io.File;
 import java.sql.ResultSet;
 
-import static junit.framework.TestCase.*;
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
 public class TestJoinBroadcast extends QueryTestCaseBase {
-  private static final Log LOG = LogFactory.getLog(TestJoinBroadcast.class);
   public TestJoinBroadcast() throws Exception {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
     testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "true");
@@ -615,4 +611,40 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
       executeString("DROP TABLE table_large PURGE").close();
     }
   }
+
+
+  @Test
+  public final void testSelfJoin() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
+    ResultSet res = executeString(
+        "create table " + tableName + " (n_name text,"
+            + "  n_comment text, n_regionkey int8) USING csv "
+            + "WITH ('csvfile.delimiter'='|')"
+            + "PARTITION BY column(n_nationkey int8)");
+    res.close();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    res = executeString(
+        "insert overwrite into " + tableName
+            + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
+    res.close();
+
+    res = executeString(
+      "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
+      + " where a.n_nationkey in (1)");
+    String expected = resultSetToString(res);
+    res.close();
+
+    res = executeString(
+      "select a.n_nationkey, a.n_name from " + tableName + " a join "+tableName +
+      " b on a.n_nationkey = b.n_nationkey "
+      + " where a.n_nationkey in (1)");
+    String resultSetData = resultSetToString(res);
+    res.close();
+
+    assertEquals(expected, resultSetData);
+
+  }
+
+
 }