You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/21 13:12:59 UTC

[5/9] git commit: TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa)

TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2b27f7de
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2b27f7de
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2b27f7de

Branch: refs/heads/window_function
Commit: 2b27f7de70904552d38801f57aa12396a9df75ac
Parents: f1f36ec
Author: blrunner <jh...@gruter.com>
Authored: Mon Apr 21 16:11:13 2014 +0900
Committer: blrunner <jh...@gruter.com>
Committed: Mon Apr 21 16:11:13 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |  2 +-
 .../engine/planner/global/GlobalPlanner.java    | 49 +++++++++++++++++++-
 .../tajo/master/querymaster/SubQuery.java       | 18 +++++++
 .../tajo/engine/query/TestJoinBroadcast.java    | 14 +++---
 .../tajo/master/TestExecutionBlockCursor.java   |  2 +-
 .../querymaster/TestQueryUnitStatusUpdate.java  |  8 ++--
 .../resources/queries/TestNetTypes/testJoin.sql |  2 +-
 .../testBroadcastSubquery2.result               |  2 +-
 9 files changed, 82 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6ed95a..0fcc83d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -334,6 +334,8 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-748: Shuffle output numbers of join may be inconsistent. (jaehwa)
+
     TAJO-777: Partition column in function parameter occurs NPE.
     (Hyoungjun Kim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 3c81ed5..5b3d4b3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -161,7 +161,7 @@ public class TajoConf extends Configuration {
     //////////////////////////////////////////
     // Distributed Query Execution Parameters
     //////////////////////////////////////////
-    DIST_QUERY_BROADCAST_JOIN_AUTO("tajo.dist-query.join.broadcast.auto", true),
+    DIST_QUERY_BROADCAST_JOIN_AUTO("tajo.dist-query.join.auto-broadcast", true),
     DIST_QUERY_BROADCAST_JOIN_THRESHOLD("tajo.dist-query.join.broadcast.threshold-bytes", (long)5 * 1048576),
 
     DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index bf2bf7d..edc08fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -210,6 +210,10 @@ public class GlobalPlanner {
     throw new PlanningException("Invalid State");
   }
 
+  private static boolean checkIfCanBeOneOfBroadcastJoin(LogicalNode node) {
+    return node.getType() == NodeType.SCAN || node.getType() == NodeType.PARTITIONS_SCAN;
+  }
+
   private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
                                         ExecutionBlock leftBlock, ExecutionBlock rightBlock)
       throws PlanningException {
@@ -218,6 +222,7 @@ public class GlobalPlanner {
 
     boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO);
 
+    // to check when the tajo.dist-query.join.broadcast.auto property is true
     if (autoBroadcast && joinNode.isCandidateBroadcast()) {
       long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
       List<LogicalNode> broadtargetTables = new ArrayList<LogicalNode>();
@@ -234,7 +239,6 @@ public class GlobalPlanner {
         }
       }
 
-      //large table must be one
       if (numLargeTables <= 1 && !broadtargetTables.isEmpty()) {
         currentBlock = masterPlan.newExecutionBlock();
         currentBlock.setPlan(joinNode);
@@ -250,6 +254,49 @@ public class GlobalPlanner {
       }
     }
 
+    LogicalNode leftNode = joinNode.getLeftChild();
+    LogicalNode rightNode = joinNode.getRightChild();
+
+    boolean leftBroadcasted = false;
+    boolean rightBroadcasted = false;
+
+    // Although broadcast join property is false, we need to handle boradcast join.
+    // It must, Shuffle output numbers of join will be consistent.
+    if (checkIfCanBeOneOfBroadcastJoin(leftNode) && checkIfCanBeOneOfBroadcastJoin(rightNode)) {
+      ScanNode leftScan = (ScanNode) leftNode;
+      ScanNode rightScan = (ScanNode) rightNode;
+
+      TableDesc leftDesc = leftScan.getTableDesc();
+      TableDesc rightDesc = rightScan.getTableDesc();
+      long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
+
+      if (leftDesc.getStats().getNumBytes() < broadcastThreshold) {
+        leftBroadcasted = true;
+      }
+      if (rightDesc.getStats().getNumBytes() < broadcastThreshold) {
+        rightBroadcasted = true;
+      }
+
+      if (leftBroadcasted || rightBroadcasted) {
+        currentBlock = masterPlan.newExecutionBlock();
+        currentBlock.setPlan(joinNode);
+        if (leftBroadcasted) {
+          currentBlock.addBroadcastTable(leftScan.getCanonicalName());
+          LOG.info("The left table " + rightScan.getCanonicalName() + " ("
+              + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
+        }
+        if (rightBroadcasted) {
+          currentBlock.addBroadcastTable(rightScan.getCanonicalName());
+          LOG.info("The right table " + rightScan.getCanonicalName() + " ("
+              + rightScan.getTableDesc().getStats().getNumBytes() + ") is marked a broadcasted table");
+        }
+
+        context.execBlockMap.remove(leftScan.getPID());
+        context.execBlockMap.remove(rightScan.getPID());
+        return currentBlock;
+      }
+    }
+
     // symmetric repartition join
     currentBlock = masterPlan.newExecutionBlock();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 63b50ac..8929e8d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -742,6 +742,24 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         taskNum = Math.min(taskNum, slots);
         LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
 
+        // The shuffle output numbers of join may be inconsistent by execution block order.
+        // Thus, we need to compare the number with DataChannel output numbers.
+        // If the number is right, the number and DataChannel output numbers will be consistent.
+        int outerShuffleOutptNum = 0, innerShuffleOutputNum = 0;
+        for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
+          outerShuffleOutptNum = Math.max(outerShuffleOutptNum, eachChannel.getShuffleOutputNum());
+        }
+
+        for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
+          innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+        }
+
+        if (outerShuffleOutptNum != innerShuffleOutputNum
+            && taskNum != outerShuffleOutptNum
+            && taskNum != innerShuffleOutputNum) {
+          taskNum = Math.max(outerShuffleOutptNum, innerShuffleOutputNum);
+        }
+
         return taskNum;
 
         // Is this subquery the first step of group-by?

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index 89519ef..f5f98a5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -365,13 +365,11 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
     cleanupQuery(res);
   }
 
-  // It doesn't run as expected because of TAJO-747 bug.
-  // Thus, we need to block this method until resolving this bug.
-//  @Test
-//  public final void testBroadcastSubquery2() throws Exception {
-//    ResultSet res = executeQuery();
-//    assertResultSet(res);
-//    cleanupQuery(res);
-//  }
+  @Test
+  public final void testBroadcastSubquery2() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index ab31c8d..f4fa74a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -118,6 +118,6 @@ public class TestExecutionBlockCursor {
       count++;
     }
 
-    assertEquals(10, count);
+    assertEquals(6, count);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
index 07b4ac5..fa89dc3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java
@@ -86,11 +86,11 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase {
 
       res = executeQuery();
 
-      long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2};
-      long[] expectedNumBytes = new long[]{18, 34, 45, 75, 109, 34, 34, 18};
-      long[] expectedReadBytes = new long[]{18, 0, 45, 0, 109, 0, 34, 0};
+      long[] expectedNumRows = new long[]{7, 2, 2, 2, 7, 2, 2, 2};
+      long[] expectedNumBytes = new long[]{63, 34, 34, 18, 109, 34, 34, 18};
+      long[] expectedReadBytes = new long[]{63, 0, 34, 0, 109, 0, 34, 0};
 
-      assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes);
+      assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes);
     } finally {
       cleanupQuery(res);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql b/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
index ec4f8e6..22c97d5 100644
--- a/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
+++ b/tajo-core/src/test/resources/queries/TestNetTypes/testJoin.sql
@@ -1 +1 @@
-select t1.*,t2.* from table1 as t1, table2 as t2 where t1.addr = t2.addr order by t2.name;
\ No newline at end of file
+select t1.*,t2.* from table1 as t1, table2 as t2 where t1.addr = t2.addr order by t1.id, t1.name,t2. name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2b27f7de/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
index 14c2211..9368976 100644
--- a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
+++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastSubquery2.result
@@ -1,3 +1,3 @@
 ?sum
 -------------------------------
-360.0
\ No newline at end of file
+190.0
\ No newline at end of file