You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/11 13:35:14 UTC

git commit: TAJO-242: Enable omitted broadcast join feature after TAJO-184. (hyunsik)

Updated Branches:
  refs/heads/master 517355dc5 -> d01e47d3f


TAJO-242: Enable omitted broadcast join feature after TAJO-184. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d01e47d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d01e47d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d01e47d3

Branch: refs/heads/master
Commit: d01e47d3feaa7581024d2ac01760e52f1df9fab1
Parents: 517355d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Oct 11 20:34:53 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Oct 11 20:34:53 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 ++-
 .../tajo-core-backend/benchmark/tpch/q9.tql     |  2 +-
 .../org/apache/tajo/master/GlobalPlanner.java   | 34 ++++++++++++++++++++
 .../tajo/master/querymaster/Repartitioner.java  |  7 ++--
 .../tajo/master/TestExecutionBlockCursor.java   |  6 ++--
 5 files changed, 47 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 55a0087..0b700dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -197,7 +197,10 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
-    TAJO-240: Reformat HiveConverter source code to match adopted conventions (jaehwa)   
+    TAJO-242: Enable omitted broadcast join feature after TAJO-184. (hyunsik)
+
+    TAJO-240: Reformat HiveConverter source code to match adopted conventions 
+    (jaehwa)   
 
     TAJO-241: Does not reconnect the meta store. (jinho)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql b/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
index c22523e..06c69ed 100644
--- a/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
+++ b/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
@@ -21,7 +21,7 @@ select
       and p_partkey = l_partkey
       and o_orderkey = l_orderkey
       and s_nationkey = n_nationkey
-      and p_name like '%[COLOR]%'
+      and p_name like 'green'
   ) as profit
 group by 
   nation,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 4e0b945..6e4ab25 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -164,6 +164,39 @@ public class GlobalPlanner {
     LogicalNode leftNode = joinNode.getLeftChild();
     LogicalNode rightNode = joinNode.getRightChild();
 
+    boolean leftBroadcasted = false;
+    boolean rightBroadcasted = false;
+
+    if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
+      ScanNode leftScan = (ScanNode) leftNode;
+      ScanNode rightScan = (ScanNode) rightNode;
+
+      TableMeta leftMeta = leftScan.getTableDesc().getMeta();
+      TableMeta rightMeta = rightScan.getTableDesc().getMeta();
+      long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.BROADCAST_JOIN_THRESHOLD);
+
+      if (leftMeta.getStat().getNumBytes() < broadcastThreshold) {
+        leftBroadcasted = true;
+      }
+      if (rightMeta.getStat().getNumBytes() < broadcastThreshold) {
+        rightBroadcasted = true;
+      }
+
+      if (leftBroadcasted || rightBroadcasted) {
+        currentBlock = masterPlan.newExecutionBlock();
+        currentBlock.setPlan(joinNode);
+        if (leftBroadcasted) {
+          currentBlock.addBroadcastTable(leftScan.getCanonicalName());
+        }
+        if (rightBroadcasted) {
+          currentBlock.addBroadcastTable(rightScan.getCanonicalName());
+        }
+        return new ExecutionBlock[] { currentBlock, childBlock };
+      }
+    }
+
+    // symmetric repartition join
+
     ExecutionBlock leftBlock;
     if (lastChildBlock == null) {
       leftBlock = masterPlan.newExecutionBlock();
@@ -190,6 +223,7 @@ public class GlobalPlanner {
     masterPlan.addConnect(rightChannel);
 
     return new ExecutionBlock[] { currentBlock, childBlock };
+
   }
 
   private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 7a956e5..194cf09 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -74,9 +74,6 @@ public class Repartitioner {
     AbstractStorageManager storageManager = subQuery.getStorageManager();
 
     ScanNode[] scans = execBlock.getScanNodes();
-    ExecutionBlock [] childBlocks = new ExecutionBlock[2];
-    childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
-    childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
 
     Path tablePath;
     Fragment [] fragments = new Fragment[2];
@@ -87,6 +84,10 @@ public class Repartitioner {
       TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
       if (tableDesc == null) { // if it is a real table stored on storage
         // TODO - to be fixed (wrong directory)
+        ExecutionBlock [] childBlocks = new ExecutionBlock[2];
+        childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
+        childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
+
         tablePath = storageManager.getTablePath(scans[i].getTableName());
         stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
         fragments[i] = new Fragment(scans[i].getCanonicalName(), tablePath,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 6ffc532..0424200 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -25,6 +25,7 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalOptimizer;
@@ -61,6 +62,7 @@ public class TestExecutionBlockCursor {
     tpch.loadOutSchema();
     for (String table : tpch.getTableNames()) {
       TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(table), CatalogProtos.StoreType.CSV);
+      m.setStat(new TableStat());
       TableDesc d = CatalogUtil.newTableDesc(table, m, new Path("file:///"));
       catalog.addTable(d);
     }
@@ -103,7 +105,7 @@ public class TestExecutionBlockCursor {
       count++;
     }
 
-    // 4 input relations, 4 join, and 1 terminal = 9 execution blocks
-    assertEquals(10, count);
+    // 4 input relations, 1 broadcast join and 2 symmetric repartition joins and 1 terminal = 8 execution blocks
+    assertEquals(8, count);
   }
 }