You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/11 13:35:14 UTC
git commit: TAJO-242: Enable omitted broadcast join feature after
TAJO-184. (hyunsik)
Updated Branches:
refs/heads/master 517355dc5 -> d01e47d3f
TAJO-242: Enable omitted broadcast join feature after TAJO-184. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d01e47d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d01e47d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d01e47d3
Branch: refs/heads/master
Commit: d01e47d3feaa7581024d2ac01760e52f1df9fab1
Parents: 517355d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Oct 11 20:34:53 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Oct 11 20:34:53 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 5 ++-
.../tajo-core-backend/benchmark/tpch/q9.tql | 2 +-
.../org/apache/tajo/master/GlobalPlanner.java | 34 ++++++++++++++++++++
.../tajo/master/querymaster/Repartitioner.java | 7 ++--
.../tajo/master/TestExecutionBlockCursor.java | 6 ++--
5 files changed, 47 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 55a0087..0b700dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -197,7 +197,10 @@ Release 0.2.0 - unreleased
BUG FIXES
- TAJO-240: Reformat HiveConverter source code to match adopted conventions (jaehwa)
+ TAJO-242: Enable omitted broadcast join feature after TAJO-184. (hyunsik)
+
+ TAJO-240: Reformat HiveConverter source code to match adopted conventions
+ (jaehwa)
TAJO-241: Does not reconnect the meta store. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql b/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
index c22523e..06c69ed 100644
--- a/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
+++ b/tajo-core/tajo-core-backend/benchmark/tpch/q9.tql
@@ -21,7 +21,7 @@ select
and p_partkey = l_partkey
and o_orderkey = l_orderkey
and s_nationkey = n_nationkey
- and p_name like '%[COLOR]%'
+ and p_name like 'green'
) as profit
group by
nation,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 4e0b945..6e4ab25 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -164,6 +164,39 @@ public class GlobalPlanner {
LogicalNode leftNode = joinNode.getLeftChild();
LogicalNode rightNode = joinNode.getRightChild();
+ boolean leftBroadcasted = false;
+ boolean rightBroadcasted = false;
+
+ if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
+ ScanNode leftScan = (ScanNode) leftNode;
+ ScanNode rightScan = (ScanNode) rightNode;
+
+ TableMeta leftMeta = leftScan.getTableDesc().getMeta();
+ TableMeta rightMeta = rightScan.getTableDesc().getMeta();
+ long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.BROADCAST_JOIN_THRESHOLD);
+
+ if (leftMeta.getStat().getNumBytes() < broadcastThreshold) {
+ leftBroadcasted = true;
+ }
+ if (rightMeta.getStat().getNumBytes() < broadcastThreshold) {
+ rightBroadcasted = true;
+ }
+
+ if (leftBroadcasted || rightBroadcasted) {
+ currentBlock = masterPlan.newExecutionBlock();
+ currentBlock.setPlan(joinNode);
+ if (leftBroadcasted) {
+ currentBlock.addBroadcastTable(leftScan.getCanonicalName());
+ }
+ if (rightBroadcasted) {
+ currentBlock.addBroadcastTable(rightScan.getCanonicalName());
+ }
+ return new ExecutionBlock[] { currentBlock, childBlock };
+ }
+ }
+
+ // symmetric repartition join
+
ExecutionBlock leftBlock;
if (lastChildBlock == null) {
leftBlock = masterPlan.newExecutionBlock();
@@ -190,6 +223,7 @@ public class GlobalPlanner {
masterPlan.addConnect(rightChannel);
return new ExecutionBlock[] { currentBlock, childBlock };
+
}
private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 7a956e5..194cf09 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -74,9 +74,6 @@ public class Repartitioner {
AbstractStorageManager storageManager = subQuery.getStorageManager();
ScanNode[] scans = execBlock.getScanNodes();
- ExecutionBlock [] childBlocks = new ExecutionBlock[2];
- childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
- childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
Path tablePath;
Fragment [] fragments = new Fragment[2];
@@ -87,6 +84,10 @@ public class Repartitioner {
TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
if (tableDesc == null) { // if it is a real table stored on storage
// TODO - to be fixed (wrong directory)
+ ExecutionBlock [] childBlocks = new ExecutionBlock[2];
+ childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
+ childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
+
tablePath = storageManager.getTablePath(scans[i].getTableName());
stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
fragments[i] = new Fragment(scans[i].getCanonicalName(), tablePath,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d01e47d3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 6ffc532..0424200 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -25,6 +25,7 @@ import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalOptimizer;
@@ -61,6 +62,7 @@ public class TestExecutionBlockCursor {
tpch.loadOutSchema();
for (String table : tpch.getTableNames()) {
TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(table), CatalogProtos.StoreType.CSV);
+ m.setStat(new TableStat());
TableDesc d = CatalogUtil.newTableDesc(table, m, new Path("file:///"));
catalog.addTable(d);
}
@@ -103,7 +105,7 @@ public class TestExecutionBlockCursor {
count++;
}
- // 4 input relations, 4 join, and 1 terminal = 9 execution blocks
- assertEquals(10, count);
+ // 4 input relations, 1 broadcast join and 2 symmetric repartition joins and 1 terminal = 8 execution blocks
+ assertEquals(8, count);
}
}