You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/21 13:12:59 UTC
[5/9] git commit: TAJO-748: Shuffle output numbers of join may be
inconsistent. (jaehwa)
TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2b27f7de
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2b27f7de
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2b27f7de
Branch: refs/heads/window_function
Commit: 2b27f7de70904552d38801f57aa12396a9df75ac
Parents: f1f36ec
Author: blrunner <jh...@gruter.com>
Authored: Mon Apr 21 16:11:13 2014 +0900
Committer: blrunner <jh...@gruter.com>
Committed: Mon Apr 21 16:11:13 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 2 +-
.../engine/planner/global/GlobalPlanner.java | 49 +++++++++++++++++++-
.../tajo/master/querymaster/SubQuery.java | 18 +++++++
.../tajo/engine/query/TestJoinBroadcast.java | 14 +++---
.../tajo/master/TestExecutionBlockCursor.java | 2 +-
.../querymaster/TestQueryUnitStatusUpdate.java | 8 ++--
.../resources/queries/TestNetTypes/testJoin.sql | 2 +-
.../testBroadcastSubquery2.result | 2 +-
9 files changed, 82 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6ed95a..0fcc83d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -334,6 +334,8 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa)
+
TAJO-777: Partition column in function parameter occurs NPE.
(Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 3c81ed5..5b3d4b3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -161,7 +161,7 @@ public class TajoConf extends Configuration {
//////////////////////////////////////////
// Distributed Query Execution Parameters
//////////////////////////////////////////
- DIST_QUERY_BROADCAST_JOIN_AUTO("tajo.dist-query.join.broadcast.auto", true),
+ DIST_QUERY_BROADCAST_JOIN_AUTO("tajo.dist-query.join.auto-broadcast", true),
DIST_QUERY_BROADCAST_JOIN_THRESHOLD("tajo.dist-query.join.broadcast.threshold-bytes", (long)5 * 1048576),
DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index bf2bf7d..edc08fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -210,6 +210,10 @@ public class GlobalPlanner {
throw new PlanningException("Invalid State");
}
+ private static boolean checkIfCanBeOneOfBroadcastJoin(LogicalNode node) {
+ return node.getType() == NodeType.SCAN || node.getType() == NodeType.PARTITIONS_SCAN;
+ }
+
private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
ExecutionBlock leftBlock, ExecutionBlock rightBlock)
throws PlanningException {
@@ -218,6 +222,7 @@ public class GlobalPlanner {
boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO);
+ // to check when the tajo.dist-query.join.broadcast.auto property is true
if (autoBroadcast && joinNode.isCandidateBroadcast()) {
long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
List<LogicalNode> broadtargetTables = new ArrayList<LogicalNode>();
@@ -234,7 +239,6 @@ public class GlobalPlanner {
}
}
- //large table must be one
if (numLargeTables <= 1 && !broadtargetTables.isEmpty()) {
currentBlock = masterPlan.newExecutionBlock();
currentBlock.setPlan(joinNode);
@@ -250,6 +254,49 @@ public class GlobalPlanner {
}
}
+ LogicalNode leftNode = joinNode.getLeftChild();
+ LogicalNode rightNode = joinNode.getRightChild();
+
+ boolean leftBroadcasted = false;
+ boolean rightBroadcasted = false;
+
+ // Although broadcast join property is false, we need to handle boradcast join.
+ // It must, Shuffle output numbers of join will be consistent.
+ if (checkIfCanBeOneOfBroadcastJoin(leftNode) && checkIfCanBeOneOfBroadcastJoin(rightNode)) {
+ ScanNode leftScan = (ScanNode) leftNode;
+ ScanNode rightScan = (ScanNode) rightNode;
+
+ TableDesc leftDesc = leftScan.getTableDesc();
+ TableDesc rightDesc = rightScan.getTableDesc();
+ long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
+
+ if (leftDesc.getStats().getNumBytes() < broadcastThreshold) {
+ leftBroadcasted = true;
+ }
+ if (rightDesc.getStats().getNumBytes() < broadcastThreshold) {
+ rightBroadcasted = true;
+ }
+
+ if (leftBroadcasted || rightBroadcasted) {
+ currentBlock = masterPlan.newExecutionBlock();
+ currentBlock.setPlan(joinNode);
+ if (leftBroadcasted) {
+ currentBlock.addBroadcastTable(leftScan.getCanonicalName());
+ LOG.info("The left table " + rightScan.getCanonicalName() + " ("
+ + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
+ }
+ if (rightBroadcasted) {
+ currentBlock.addBroadcastTable(rightScan.getCanonicalName());
+ LOG.info("The right table " + rightScan.getCanonicalName() + " ("
+ + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
+ }
+
+ context.execBlockMap.remove(leftScan.getPID());
+ context.execBlockMap.remove(rightScan.getPID());
+ return currentBlock;
+ }
+ }
+
// symmetric repartition join
currentBlock = masterPlan.newExecutionBlock();
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 63b50ac..8929e8d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -742,6 +742,24 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
taskNum = Math.min(taskNum, slots);
LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
+ // The shuffle output numbers of join may be inconsistent by execution block order.
+ // Thus, we need to compare the number with DataChannel output numbers.
+ // If the number is right, the number and DataChannel output numbers will be consistent.
+ int outerShuffleOutptNum = 0, innerShuffleOutputNum = 0;
+ for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
+ outerShuffleOutptNum = Math.max(outerShuffleOutptNum, eachChannel.getShuffleOutputNum());
+ }
+
+ for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
+ innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+ }
+
+ if (outerShuffleOutptNum != innerShuffleOutputNum
+ && taskNum != outerShuffleOutptNum
+ && taskNum != innerShuffleOutputNum) {
+ taskNum = Math.max(outerShuffleOutptNum, innerShuffleOutputNum);
+ }
+
return taskNum;
// Is this subquery the first step of group-by?
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 89519ef..f5f98a5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -365,13 +365,11 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
cleanupQuery(res);
}
- // It doesn't run as expected because of TAJO-747 bug.
- // Thus, we need to block this method until resolving this bug.
-// @Test
-// public final void testBroadcastSubquery2() throws Exception {
-// ResultSet res = executeQuery();
-// assertResultSet(res);
-// cleanupQuery(res);
-// }
+ @Test
+ public final void testBroadcastSubquery2() throws Exception {
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index ab31c8d..f4fa74a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -118,6 +118,6 @@ public class TestExecutionBlockCursor {
count++;
}
- assertEquals(10, count);
+ assertEquals(6, count);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
index 07b4ac5..fa89dc3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
@@ -86,11 +86,11 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase {
res = executeQuery();
- long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2};
- long[] expectedNumBytes = new long[]{18, 34, 45, 75, 109, 34, 34, 18};
- long[] expectedReadBytes = new long[]{18, 0, 45, 0, 109, 0, 34, 0};
+ long[] expectedNumRows = new long[]{7, 2, 2, 2, 7, 2, 2, 2};
+ long[] expectedNumBytes = new long[]{63, 34, 34, 18, 109, 34, 34, 18};
+ long[] expectedReadBytes = new long[]{63, 0, 34, 0, 109, 0, 34, 0};
- assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes);
+ assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
} finally {
cleanupQuery(res);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql b/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
index ec4f8e6..22c97d5 100644
--- a/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
+++ b/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
@@ -1 +1 @@
-select t1.*,t2.* from table1 as t1, table2 as t2 where t1.addr = t2.addr order by t2.name;
\ No newline at end of file
+select t1.*,t2.* from table1 as t1, table2 as t2 where t1.addr = t2.addr order by t1.id, t1.name,t2. name;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
index 14c2211..9368976 100644
--- a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
+++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
@@ -1,3 +1,3 @@
?sum
-------------------------------
-360.0
\ No newline at end of file
+190.0
\ No newline at end of file