You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/01 07:58:57 UTC
[04/14] git commit: TAJO-968: Self-Join query (including partitioned
table) doesn't run unexpectedly using auto broad cast join. (jaehwa)
TAJO-968: Self-Join query (including partitioned table) doesn't run unexpectedly using auto broad cast join. (jaehwa)
Closes #88
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72808e06
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72808e06
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72808e06
Branch: refs/heads/index_support
Commit: 72808e06f02cbb0bd7d9cf345544c60205cf34b0
Parents: 326be45
Author: blrunner <bl...@apache.org>
Authored: Thu Jul 24 11:27:18 2014 +0900
Committer: blrunner <bl...@apache.org>
Committed: Thu Jul 24 11:27:18 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../apache/tajo/worker/TaskAttemptContext.java | 26 +++++++++++-
.../tajo/engine/query/TestJoinBroadcast.java | 42 +++++++++++++++++---
3 files changed, 64 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a67625d..08cf60a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -97,6 +97,9 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-968: Self-Join query (including partitioned table) doesn't run unexpectedly
+ using auto broadcast join. (jaewha)
+
TAJO-914: join queries with constant values can cause schema mismatch in
logical plan. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 1f0c410..db4af45 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -31,8 +31,10 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.TUtil;
import java.io.File;
import java.util.*;
@@ -234,10 +236,30 @@ public class TaskAttemptContext {
tableFragments = new ArrayList<FragmentProto>();
}
+ List<Path> paths = fragmentToPath(tableFragments);
+
for (FragmentProto eachFragment: fragments) {
- tableFragments.add(eachFragment);
+ FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
+ // If current attempt already has same path, we don't need to add it to fragments.
+ if (!paths.contains(fileFragment.getPath())) {
+ tableFragments.add(eachFragment);
+ }
}
- fragmentMap.put(tableId, tableFragments);
+
+ if (tableFragments.size() > 0) {
+ fragmentMap.put(tableId, tableFragments);
+ }
+ }
+
+ private List<Path> fragmentToPath(List<FragmentProto> tableFragments) {
+ List<Path> list = TUtil.newList();
+
+ for (FragmentProto proto : tableFragments) {
+ FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+ list.add(fragment.getPath());
+ }
+
+ return list;
}
public Path getWorkDir() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 9cc65bc..5df6f24 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -18,8 +18,6 @@
package org.apache.tajo.engine.query;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
@@ -43,13 +41,11 @@ import org.junit.experimental.categories.Category;
import java.io.File;
import java.sql.ResultSet;
-import static junit.framework.TestCase.*;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
@Category(IntegrationTest.class)
public class TestJoinBroadcast extends QueryTestCaseBase {
- private static final Log LOG = LogFactory.getLog(TestJoinBroadcast.class);
public TestJoinBroadcast() throws Exception {
super(TajoConstants.DEFAULT_DATABASE_NAME);
testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "true");
@@ -615,4 +611,40 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
executeString("DROP TABLE table_large PURGE").close();
}
}
+
+
+ @Test
+ public final void testSelfJoin() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation");
+ ResultSet res = executeString(
+ "create table " + tableName + " (n_name text,"
+ + " n_comment text, n_regionkey int8) USING csv "
+ + "WITH ('csvfile.delimiter'='|')"
+ + "PARTITION BY column(n_nationkey int8)");
+ res.close();
+ assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+ res = executeString(
+ "insert overwrite into " + tableName
+ + " select n_name, n_comment, n_regionkey, n_nationkey from nation");
+ res.close();
+
+ res = executeString(
+ "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey"
+ + " where a.n_nationkey in (1)");
+ String expected = resultSetToString(res);
+ res.close();
+
+ res = executeString(
+ "select a.n_nationkey, a.n_name from " + tableName + " a join "+tableName +
+ " b on a.n_nationkey = b.n_nationkey "
+ + " where a.n_nationkey in (1)");
+ String resultSetData = resultSetToString(res);
+ res.close();
+
+ assertEquals(expected, resultSetData);
+
+ }
+
+
}