You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/27 13:17:10 UTC

[13/13] tajo git commit: TAJO-1553: Improve broadcast join planning. (jihoon)

TAJO-1553: Improve broadcast join planning. (jihoon)

Closes #583


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8fd9ae72
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8fd9ae72
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8fd9ae72

Branch: refs/heads/master
Commit: 8fd9ae72c70b24c25eaf17d60a832cc7bd7d252c
Parents: 4f3a46c
Author: Jihoon Son <ji...@apache.org>
Authored: Wed May 27 20:16:25 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed May 27 20:16:25 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../BroadcastJoinMarkCandidateVisitor.java      |  72 ----
 .../planner/BroadcastJoinPlanVisitor.java       |  60 ----
 .../engine/planner/PhysicalPlannerImpl.java     |   2 +-
 .../engine/planner/global/ExecutionBlock.java   | 117 ++++++-
 .../engine/planner/global/GlobalPlanner.java    | 263 +-------------
 .../tajo/engine/planner/global/MasterPlan.java  |  19 +
 .../global/builder/DistinctGroupbyBuilder.java  |   9 +-
 .../BaseGlobalPlanRewriteRuleProvider.java      |   6 +-
 .../rewriter/GlobalPlanRewriteEngine.java       |   5 +-
 .../global/rewriter/GlobalPlanRewriteRule.java  |   9 +-
 .../rewriter/rules/BroadcastJoinRule.java       | 348 +++++++++++++++++++
 .../rules/GlobalPlanEqualityTester.java         |   3 +-
 .../rewriter/rules/GlobalPlanRewriteUtil.java   | 216 ++++++++++++
 .../planner/physical/CommonHashJoinExec.java    |  13 +
 .../DistinctGroupbyThirdAggregationExec.java    |   2 +-
 .../NonForwardQueryResultSystemScanner.java     |   2 +-
 .../apache/tajo/master/exec/QueryExecutor.java  |   2 +-
 .../tajo/querymaster/DefaultTaskScheduler.java  |   2 +-
 .../tajo/querymaster/QueryMasterTask.java       |   2 +-
 .../apache/tajo/querymaster/Repartitioner.java  |  62 +---
 .../java/org/apache/tajo/querymaster/Stage.java |   7 +-
 .../org/apache/tajo/benchmark/TestTPCH.java     |  25 +-
 .../tajo/engine/query/TestGroupByQuery.java     |   6 +-
 .../engine/query/TestInnerJoinWithSubQuery.java |   7 +
 .../tajo/engine/query/TestOuterJoinQuery.java   |   8 +-
 .../apache/tajo/engine/query/TestSortQuery.java |   7 +
 .../tajo/engine/query/TestUnionQuery.java       |  13 +
 .../tajo/master/TestExecutionBlockCursor.java   |   2 +-
 .../apache/tajo/master/TestGlobalPlanner.java   | 347 ------------------
 .../apache/tajo/querymaster/TestKillQuery.java  |   4 +-
 .../testBroadcastTwoPartJoin.sql                |   2 +-
 .../testThetaJoinKeyPairs.sql                   |  20 ++
 .../testSubQuerySortAfterGroupMultiBlocks.sql   |   5 +
 .../resources/queries/TestTPCH/testTPCHQ5.sql   |  24 ++
 .../TestUnionQuery/testComplexUnion1.sql        |  27 ++
 .../TestUnionQuery/testComplexUnion2.sql        |  35 ++
 .../queries/default/complex_union_1.sql         |  29 --
 .../queries/default/complex_union_2.sql         |  35 --
 .../testBroadcastTwoPartJoin.Hash.plan          | 135 +++----
 .../testBroadcastTwoPartJoin.Sort.plan          | 135 +++----
 .../testComplexJoinCondition1.Hash.plan         |   5 +-
 .../testComplexJoinCondition1.Sort.plan         |   5 +-
 .../testComplexJoinCondition2.Hash.plan         |   5 +-
 .../testComplexJoinCondition2.Sort.plan         |   5 +-
 .../testComplexJoinCondition3.Hash.plan         |   5 +-
 .../testComplexJoinCondition3.Sort.plan         |   5 +-
 .../testComplexJoinCondition4.Hash.plan         |   5 +-
 .../testComplexJoinCondition4.Sort.plan         |   5 +-
 .../testCrossJoin.1.Hash.plan                   |   7 +-
 .../testCrossJoin.1.Sort.plan                   |   7 +-
 .../testCrossJoin.2.Hash.plan                   |   1 -
 .../testCrossJoin.2.Sort.plan                   |   1 -
 .../testCrossJoin.3.Hash.plan                   |   3 +-
 .../testCrossJoin.3.Sort.plan                   |   3 +-
 .../testCrossJoin.4.Hash.plan                   |   3 +-
 .../testCrossJoin.4.Sort.plan                   |   3 +-
 .../testCrossJoin.5.Hash.plan                   |   7 +-
 .../testCrossJoin.5.Sort.plan                   |   7 +-
 .../testCrossJoinAndCaseWhen.Hash.plan          |   5 +-
 .../testCrossJoinAndCaseWhen.Sort.plan          |   5 +-
 .../testCrossJoinWithAsterisk1.Hash.plan        |   5 +-
 .../testCrossJoinWithAsterisk1.Sort.plan        |   5 +-
 .../testCrossJoinWithAsterisk2.Hash.plan        |   7 +-
 .../testCrossJoinWithAsterisk2.Sort.plan        |   7 +-
 .../testCrossJoinWithAsterisk3.Hash.plan        |   7 +-
 .../testCrossJoinWithAsterisk3.Sort.plan        |   7 +-
 .../testCrossJoinWithAsterisk4.Hash.plan        |   7 +-
 .../testCrossJoinWithAsterisk4.Sort.plan        |   7 +-
 .../testCrossJoinWithEmptyTable1.Hash.plan      |   5 +-
 .../testCrossJoinWithEmptyTable1.Sort.plan      |   5 +-
 ...sJoinWithThetaJoinConditionInWhere.Hash.plan |   1 -
 ...sJoinWithThetaJoinConditionInWhere.Sort.plan |   1 -
 .../testDifferentTypesJoinCondition.Hash.plan   |   1 -
 .../testDifferentTypesJoinCondition.Sort.plan   |   1 -
 .../testInnerJoinAndCaseWhen.Hash.plan          |   5 +-
 .../testInnerJoinAndCaseWhen.Sort.plan          |   5 +-
 .../testInnerJoinWithEmptyTable.Hash.plan       |   5 +-
 .../testInnerJoinWithEmptyTable.Sort.plan       |   5 +-
 ...rJoinWithThetaJoinConditionInWhere.Hash.plan |   3 +-
 ...rJoinWithThetaJoinConditionInWhere.Sort.plan |   3 +-
 .../testJoinAsterisk.Hash.plan                  |   1 -
 .../testJoinAsterisk.Sort.plan                  |   1 -
 .../testJoinCoReferredEvals1.Hash.plan          |   5 +-
 .../testJoinCoReferredEvals1.Sort.plan          |   5 +-
 ...tJoinCoReferredEvalsWithSameExprs1.Hash.plan |   7 +-
 ...tJoinCoReferredEvalsWithSameExprs1.Sort.plan |   7 +-
 ...tJoinCoReferredEvalsWithSameExprs2.Hash.plan |  11 +-
 ...tJoinCoReferredEvalsWithSameExprs2.Sort.plan |  11 +-
 .../testJoinOnMultipleDatabases.Hash.plan       | 135 +++----
 .../testJoinOnMultipleDatabases.Sort.plan       | 135 +++----
 .../testJoinWithMultipleJoinQual1.Hash.plan     | 137 +++-----
 .../testJoinWithMultipleJoinQual1.Sort.plan     | 137 +++-----
 .../testJoinWithOrPredicates.Hash.plan          |   7 +-
 .../testJoinWithOrPredicates.Sort.plan          |   7 +-
 .../testNaturalJoin.Hash.plan                   |   7 +-
 .../testNaturalJoin.Sort.plan                   |   7 +-
 .../TestInnerJoinQuery/testTPCHQ2Join.Hash.plan | 137 +++-----
 .../TestInnerJoinQuery/testTPCHQ2Join.Sort.plan | 137 +++-----
 .../testWhereClauseJoin1.Hash.plan              |   5 +-
 .../testWhereClauseJoin1.Sort.plan              |   5 +-
 .../testWhereClauseJoin2.Hash.plan              |   5 +-
 .../testWhereClauseJoin2.Sort.plan              |   5 +-
 .../testWhereClauseJoin3.Hash.plan              |   5 +-
 .../testWhereClauseJoin3.Sort.plan              |   5 +-
 .../testWhereClauseJoin4.Hash.plan              |   7 +-
 .../testWhereClauseJoin4.Sort.plan              |   7 +-
 .../testWhereClauseJoin5.Hash.plan              |  33 +-
 .../testWhereClauseJoin5.Sort.plan              |  33 +-
 .../testWhereClauseJoin6.Hash.plan              |  37 +-
 .../testWhereClauseJoin6.Sort.plan              |  37 +-
 .../testBroadcastSubquery.Hash.plan             | 118 ++-----
 .../testBroadcastSubquery.Sort.plan             | 118 ++-----
 .../testBroadcastSubquery2.Hash.plan            | 180 ++++------
 .../testBroadcastSubquery2.Sort.plan            | 180 ++++------
 .../testComplexJoinCondition5.Hash.plan         |  69 ++--
 .../testComplexJoinCondition5.Sort.plan         |  69 ++--
 .../testComplexJoinCondition6.Hash.plan         |  94 ++---
 .../testComplexJoinCondition6.Sort.plan         |  94 ++---
 .../testComplexJoinCondition7.Hash.plan         |  94 ++---
 .../testComplexJoinCondition7.Sort.plan         |  94 ++---
 .../testJoinWithMultipleJoinQual2.Hash.plan     |  88 ++---
 .../testJoinWithMultipleJoinQual2.Sort.plan     |  88 ++---
 .../testJoinWithMultipleJoinQual3.Hash.plan     | 109 ++----
 .../testJoinWithMultipleJoinQual3.Sort.plan     | 109 ++----
 .../testJoinWithMultipleJoinQual4.Hash.plan     | 109 ++----
 .../testJoinWithMultipleJoinQual4.Sort.plan     | 109 ++----
 .../testThetaJoinKeyPairs.Hash.plan             | 142 ++++++++
 .../testThetaJoinKeyPairs.Hash_NoBroadcast.plan | 196 +++++++++++
 .../testThetaJoinKeyPairs.Sort.plan             | 142 ++++++++
 .../testThetaJoinKeyPairs.Sort_NoBroadcast.plan | 196 +++++++++++
 .../testThetaJoinKeyPairs.result                |  27 ++
 .../testComplexJoinsWithCaseWhen.Hash.plan      |  33 +-
 .../testComplexJoinsWithCaseWhen.Sort.plan      |  33 +-
 .../testComplexJoinsWithCaseWhen2.Hash.plan     |  91 ++---
 .../testComplexJoinsWithCaseWhen2.Sort.plan     |  91 ++---
 .../testInnerAndOuterWithEmpty.1.Hash.plan      |  17 +-
 .../testInnerAndOuterWithEmpty.1.Sort.plan      |  17 +-
 .../testJoinWithMultipleJoinTypes.Hash.plan     | 108 ++----
 .../testJoinWithMultipleJoinTypes.Sort.plan     | 108 ++----
 .../testFullOuterJoin1.Hash.plan                |  58 +++-
 .../testFullOuterJoin1.Sort.plan                |  58 +++-
 ...lOuterJoinPredicationCaseByCase1.1.Hash.plan | 153 ++++++++
 ...edicationCaseByCase1.1.Hash_NoBroadcast.plan | 153 ++++++++
 ...lOuterJoinPredicationCaseByCase1.1.Sort.plan | 153 ++++++++
 ...edicationCaseByCase1.1.Sort_NoBroadcast.plan | 153 ++++++++
 ...FullOuterJoinPredicationCaseByCase1.1.result |   9 +
 .../testFullOuterJoinWithEmptyTable1.Hash.plan  |  58 +++-
 .../testFullOuterJoinWithEmptyTable1.Sort.plan  |  58 +++-
 ...testJoinFilterOfRowPreservedTable1.Hash.plan |   4 +-
 ...testJoinFilterOfRowPreservedTable1.Sort.plan |   4 +-
 .../testLeftOuterJoin1.Hash.plan                |   4 +-
 .../testLeftOuterJoin1.Sort.plan                |   4 +-
 .../testLeftOuterJoin2.Hash.plan                |  14 +-
 .../testLeftOuterJoin2.Sort.plan                |  14 +-
 .../testLeftOuterJoin3.Hash.plan                |  22 +-
 .../testLeftOuterJoin3.Sort.plan                |  22 +-
 ...tOuterJoinPredicationCaseByCase1.1.Hash.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase1.1.Sort.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase2.1.Hash.plan |  18 +-
 ...tOuterJoinPredicationCaseByCase2.1.Sort.plan |  18 +-
 ...uterJoinPredicationCaseByCase2_1.1.Hash.plan |  21 +-
 ...uterJoinPredicationCaseByCase2_1.1.Sort.plan |  21 +-
 ...tOuterJoinPredicationCaseByCase3.1.Hash.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase3.1.Sort.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase4.1.Hash.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase4.1.Sort.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase5.1.Hash.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase5.1.Sort.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase6.1.Hash.plan |  14 +-
 ...tOuterJoinPredicationCaseByCase6.1.Sort.plan |  14 +-
 ...testLeftOuterJoinWithConstantExpr1.Hash.plan |   4 +-
 ...testLeftOuterJoinWithConstantExpr1.Sort.plan |   4 +-
 .../testLeftOuterJoinWithEmptyTable1.Hash.plan  |   4 +-
 .../testLeftOuterJoinWithEmptyTable1.Sort.plan  |   4 +-
 .../testLeftOuterJoinWithEmptyTable2.Hash.plan  |   8 +-
 .../testLeftOuterJoinWithEmptyTable2.Sort.plan  |   8 +-
 .../testLeftOuterJoinWithEmptyTable3.Hash.plan  |  12 +-
 .../testLeftOuterJoinWithEmptyTable3.Sort.plan  |  12 +-
 .../testLeftOuterJoinWithEmptyTable4.Hash.plan  |   8 +-
 .../testLeftOuterJoinWithEmptyTable4.Sort.plan  |   8 +-
 .../testLeftOuterJoinWithEmptyTable5.Hash.plan  |   8 +-
 .../testLeftOuterJoinWithEmptyTable5.Sort.plan  |   8 +-
 .../testLeftOuterJoinWithNull1.Hash.plan        |   4 +-
 .../testLeftOuterJoinWithNull1.Sort.plan        |   4 +-
 .../testLeftOuterJoinWithNull2.Hash.plan        |   4 +-
 .../testLeftOuterJoinWithNull2.Sort.plan        |   4 +-
 .../testLeftOuterJoinWithNull3.Hash.plan        |   4 +-
 .../testLeftOuterJoinWithNull3.Sort.plan        |   4 +-
 ...leBroadcastDataFileWithZeroLength2.Hash.plan |  14 +-
 ...leBroadcastDataFileWithZeroLength2.Sort.plan |  14 +-
 .../testOuterJoinAndCaseWhen1.Hash.plan         |  30 +-
 .../testOuterJoinAndCaseWhen1.Sort.plan         |  30 +-
 .../testRightOuterJoin1.Hash.plan               |   4 +-
 .../testRightOuterJoin1.Sort.plan               |   4 +-
 ...tOuterJoinPredicationCaseByCase1.1.Hash.plan | 101 ++++++
 ...edicationCaseByCase1.1.Hash_NoBroadcast.plan | 128 +++++++
 ...tOuterJoinPredicationCaseByCase1.1.Sort.plan | 101 ++++++
 ...edicationCaseByCase1.1.Sort_NoBroadcast.plan | 128 +++++++
 ...ightOuterJoinPredicationCaseByCase1.1.result |   4 +
 ...tOuterJoinPredicationCaseByCase2.1.Hash.plan |  76 ++++
 ...edicationCaseByCase2.1.Hash_NoBroadcast.plan | 132 +++++++
 ...tOuterJoinPredicationCaseByCase2.1.Sort.plan |  76 ++++
 ...edicationCaseByCase2.1.Sort_NoBroadcast.plan | 132 +++++++
 ...ightOuterJoinPredicationCaseByCase2.1.result |   4 +
 ...tOuterJoinPredicationCaseByCase3.1.Hash.plan | 101 ++++++
 ...edicationCaseByCase3.1.Hash_NoBroadcast.plan | 128 +++++++
 ...tOuterJoinPredicationCaseByCase3.1.Sort.plan | 101 ++++++
 ...edicationCaseByCase3.1.Sort_NoBroadcast.plan | 128 +++++++
 ...ightOuterJoinPredicationCaseByCase3.1.result |   4 +
 .../testRightOuterJoinWithEmptyTable1.Hash.plan |   4 +-
 .../testRightOuterJoinWithEmptyTable1.Sort.plan |   4 +-
 ...testLeftOuterJoinWithConstantExpr2.Hash.plan |  69 ++--
 ...testLeftOuterJoinWithConstantExpr2.Sort.plan |  69 ++--
 ...tLeftOuterJoinWithEmptySubquery1.1.Hash.plan |  96 ++---
 ...tLeftOuterJoinWithEmptySubquery1.1.Sort.plan |  96 ++---
 ...tLeftOuterJoinWithEmptySubquery2.1.Hash.plan |  88 ++---
 ...tLeftOuterJoinWithEmptySubquery2.1.Sort.plan |  88 ++---
 .../testSubQuerySortAfterGroupMultiBlocks.plan  | 112 ++++++
 ...testSubQuerySortAfterGroupMultiBlocks.result |   5 +
 .../results/TestTPCH/testQ1OrderBy.plan         |  88 +++++
 .../results/TestTPCH/testQ1OrderBy.result       |   2 +-
 .../results/TestTPCH/testQ2FourJoins.plan       | 228 ++++++++++++
 .../results/TestTPCH/testQ2FourJoins.result     |   2 +-
 .../results/TestTPCH/testTPCH14Expr.plan        | 119 +++++++
 .../results/TestTPCH/testTPCH14Expr.result      |   2 +-
 .../resources/results/TestTPCH/testTPCHQ5.plan  | 332 ++++++++++++++++++
 .../results/TestTPCH/testTPCHQ5.result          |   2 +
 .../TestUnionQuery/testComplexUnion1.plan       |  80 +++++
 .../TestUnionQuery/testComplexUnion1.result     |   6 +
 .../TestUnionQuery/testComplexUnion2.plan       | 116 +++++++
 .../TestUnionQuery/testComplexUnion2.result     |   7 +
 .../plan/expr/AggregationFunctionCallEval.java  |  63 ++--
 .../function/python/PythonScriptEngine.java     |  17 +-
 .../plan/function/python/TajoScriptEngine.java  |  13 +-
 .../tajo/plan/logical/CreateTableNode.java      |   5 +
 .../org/apache/tajo/plan/logical/JoinNode.java  |  22 +-
 .../plan/rewrite/rules/FilterPushDownRule.java  |   1 +
 .../tajo/plan/serder/EvalNodeDeserializer.java  |  14 +-
 .../tajo/plan/serder/EvalNodeSerializer.java    |   4 +-
 .../plan/serder/LogicalNodeDeserializer.java    |   5 -
 .../tajo/plan/serder/LogicalNodeSerializer.java |  18 +-
 tajo-plan/src/main/proto/Plan.proto             |  12 +-
 243 files changed, 6687 insertions(+), 4087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index aeda418..da63606 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1553: Improve broadcast join planning. (jihoon)
+
     TAJO-1577: Add test cases to verify join plans. (jihoon)
 
     TAJO-1607: Tajo Rest Cache-Id should be bigger than zero. (Contributed by 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinMarkCandidateVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinMarkCandidateVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinMarkCandidateVisitor.java
deleted file mode 100644
index 6b3dc48..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinMarkCandidateVisitor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
-import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.plan.logical.ScanNode;
-
-import java.util.Stack;
-
-public class BroadcastJoinMarkCandidateVisitor extends BasicLogicalPlanVisitor<GlobalPlanner.GlobalPlanContext, LogicalNode> {
-  @Override
-  public LogicalNode visitJoin(GlobalPlanner.GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                               JoinNode node, Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode leftChild = node.getLeftChild();
-    LogicalNode rightChild = node.getRightChild();
-
-    if (ScanNode.isScanNode(leftChild) && ScanNode.isScanNode(rightChild)) {
-      node.setCandidateBroadcast(true);
-      return node;
-    }
-
-    if(!ScanNode.isScanNode(leftChild)) {
-      visit(context, plan, block, leftChild, stack);
-    }
-
-    if(!ScanNode.isScanNode(rightChild)) {
-      visit(context, plan, block, rightChild, stack);
-    }
-
-    if(isBroadcastCandidateNode(leftChild) &&
-        isBroadcastCandidateNode(rightChild)) {
-      node.setCandidateBroadcast(true);
-    }
-
-    return node;
-  }
-
-  public static boolean isBroadcastCandidateNode(LogicalNode node) {
-    if(node.getType() == NodeType.SCAN ||
-        node.getType() == NodeType.PARTITIONS_SCAN) {
-      return true;
-    }
-
-    if(node.getType() == NodeType.JOIN && ((JoinNode)node).isCandidateBroadcast()) {
-      return true;
-    }
-
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinPlanVisitor.java
deleted file mode 100644
index bd8efbe..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/BroadcastJoinPlanVisitor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
-import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.plan.logical.ScanNode;
-
-import java.util.Stack;
-
-public class BroadcastJoinPlanVisitor extends BasicLogicalPlanVisitor<GlobalPlanner.GlobalPlanContext, LogicalNode> {
-  @Override
-  public LogicalNode visitJoin(GlobalPlanner.GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                               JoinNode node, Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode leftChild = node.getLeftChild();
-    LogicalNode rightChild = node.getRightChild();
-
-    if (leftChild.getType() == NodeType.JOIN  && ScanNode.isScanNode(rightChild)) {
-      node.getBroadcastCandidateTargets().add(node);
-    }
-    LogicalNode parentNode = stack.peek();
-    if (parentNode != null && parentNode.getType() == NodeType.JOIN) {
-      node.getBroadcastCandidateTargets().addAll(((JoinNode)parentNode).getBroadcastCandidateTargets());
-    }
-
-    Stack<LogicalNode> currentStack = new Stack<LogicalNode>();
-    currentStack.push(node);
-    if(!ScanNode.isScanNode(leftChild)) {
-      visit(context, plan, block, leftChild, currentStack);
-    }
-
-    if(!ScanNode.isScanNode(rightChild)) {
-      visit(context, plan, block, rightChild, currentStack);
-    }
-    currentStack.pop();
-
-    return node;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 0252051..30cb24f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -34,7 +34,6 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.physical.*;
@@ -47,6 +46,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index aecb364..4f352c1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -17,6 +17,7 @@ package org.apache.tajo.engine.planner.global;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.util.TUtil;
 
 import java.util.*;
 
@@ -39,8 +40,56 @@ public class ExecutionBlock {
 
   private boolean hasJoinPlan;
   private boolean hasUnionPlan;
-
-  private Set<String> broadcasted = new HashSet<String>();
+  private boolean isUnionOnly;
+
+  private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap();
+
+  /*
+   * An execution block is null-supplying or preserved-row when its output is used as an input for outer join.
+   * These properties are decided based on the type of parent execution block's outer join.
+   * Here are brief descriptions for these properties.
+   *
+   * 1) left outer join
+   *
+   *              parent eb
+   *         -------------------
+   *         | left outer join |
+   *         -------------------
+   *           /              \
+   *   left child eb     right child eb
+   * ----------------- ------------------
+   * | preserved-row | | null-supplying |
+   * ----------------- ------------------
+   *
+   * 2) right outer join
+   *
+   *               parent eb
+   *         --------------------
+   *         | right outer join |
+   *         --------------------
+   *           /              \
+   *   left child eb     right child eb
+   * ------------------ -----------------
+   * | null-supplying | | preserved-row |
+   * ------------------ -----------------
+   *
+   * 3) full outer join
+   *
+   *               parent eb
+   *         -------------------
+   *         | full outer join |
+   *         -------------------
+   *           /              \
+   *   left child eb      right child eb
+   * ------------------ ------------------
+   * | null-supplying | | preserved-row  |
+   * | preserved-row  | | null-supplying |
+   * ------------------ ------------------
+   *
+   * The null-supplying and preserved-row properties are used to find which relations will be broadcasted.
+   */
+  protected boolean nullSuppllying = false;
+  protected boolean preservedRow = false;
 
   public ExecutionBlock(ExecutionBlockId executionBlockId) {
     this.executionBlockId = executionBlockId;
@@ -53,6 +102,7 @@ public class ExecutionBlock {
   public void setPlan(LogicalNode plan) {
     hasJoinPlan = false;
     hasUnionPlan = false;
+    isUnionOnly = true;
     this.scanlist.clear();
     this.plan = plan;
 
@@ -65,6 +115,12 @@ public class ExecutionBlock {
     s.add(node);
     while (!s.isEmpty()) {
       node = s.remove(s.size()-1);
+      // TODO: the below code should be improved to handle every case
+      if (isUnionOnly && node.getType() != NodeType.ROOT && node.getType() != NodeType.TABLE_SUBQUERY &&
+          node.getType() != NodeType.SCAN && node.getType() != NodeType.PARTITIONS_SCAN &&
+          node.getType() != NodeType.UNION && node.getType() != NodeType.PROJECTION) {
+        isUnionOnly = false;
+      }
       if (node instanceof UnaryNode) {
         UnaryNode unary = (UnaryNode) node;
         s.add(s.size(), unary.getChild());
@@ -86,6 +142,9 @@ public class ExecutionBlock {
         store = (StoreTableNode)node;
       }
     }
+    if (!hasUnionPlan) {
+      isUnionOnly = false;
+    }
   }
 
   public void addUnionScan(ExecutionBlockId realScanEbId, ExecutionBlockId delegatedScanEbId) {
@@ -108,6 +167,16 @@ public class ExecutionBlock {
     return store;
   }
 
+  public int getNonBroadcastRelNum() {
+    int nonBroadcastRelNum = 0;
+    for (ScanNode scanNode : scanlist) {
+      if (!broadcastRelations.containsKey(scanNode.getCanonicalName())) {
+        nonBroadcastRelNum++;
+      }
+    }
+    return nonBroadcastRelNum;
+  }
+
   public ScanNode [] getScanNodes() {
     return this.scanlist.toArray(new ScanNode[scanlist.size()]);
   }
@@ -120,25 +189,49 @@ public class ExecutionBlock {
     return hasUnionPlan;
   }
 
-  public void addBroadcastTable(String tableName) {
-    broadcasted.add(tableName);
-    enforcer.addBroadcast(tableName);
+  public boolean isUnionOnly() {
+    return isUnionOnly;
   }
 
-  public void removeBroadcastTable(String tableName) {
-    broadcasted.remove(tableName);
-    enforcer.removeBroadcast(tableName);
+  public void addBroadcastRelation(ScanNode relationNode) {
+    broadcastRelations.put(relationNode.getCanonicalName(), relationNode);
+    enforcer.addBroadcast(relationNode.getCanonicalName());
   }
 
-  public boolean isBroadcastTable(String tableName) {
-    return broadcasted.contains(tableName);
+  public void removeBroadcastRelation(ScanNode relationNode) {
+    broadcastRelations.remove(relationNode.getCanonicalName());
+    enforcer.removeBroadcast(relationNode.getCanonicalName());
   }
 
-  public Collection<String> getBroadcastTables() {
-    return broadcasted;
+  public boolean isBroadcastRelation(ScanNode relationNode) {
+    return broadcastRelations.containsKey(relationNode.getCanonicalName());
+  }
+
+  public boolean hasBroadcastRelation() {
+    return broadcastRelations.size() > 0;
+  }
+
+  public Collection<ScanNode> getBroadcastRelations() {
+    return broadcastRelations.values();
   }
 
   public String toString() {
     return executionBlockId.toString();
   }
+
+  public void setNullSuppllying() {
+    nullSuppllying = true;
+  }
+
+  public void setPreservedRow() {
+    preservedRow = true;
+  }
+
+  public boolean isNullSuppllying() {
+    return nullSuppllying;
+  }
+
+  public boolean isPreservedRow() {
+    return preservedRow;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 48b5326..89e887a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -35,17 +35,15 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.BroadcastJoinMarkCandidateVisitor;
-import org.apache.tajo.engine.planner.BroadcastJoinPlanVisitor;
 import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.function.AggFunction;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
 import org.apache.tajo.plan.util.PlannerUtil;
@@ -119,7 +117,7 @@ public class GlobalPlanner {
   /**
    * Builds a master plan from the given logical plan.
    */
-  public void build(MasterPlan masterPlan) throws IOException, PlanningException {
+  public void build(QueryContext queryContext, MasterPlan masterPlan) throws IOException, PlanningException {
 
     DistributedPlannerVisitor planner = new DistributedPlannerVisitor();
     GlobalPlanContext globalPlanContext = new GlobalPlanContext();
@@ -132,19 +130,6 @@ public class GlobalPlanner {
     LogicalNode inputPlan = PlannerUtil.clone(masterPlan.getLogicalPlan(),
         masterPlan.getLogicalPlan().getRootBlock().getRoot());
 
-    boolean broadcastEnabled = masterPlan.getContext().getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
-    if (broadcastEnabled) {
-      // pre-visit the master plan in order to find tables to be broadcasted
-      // this visiting does not make any execution block and change plan.
-      BroadcastJoinMarkCandidateVisitor markCandidateVisitor = new BroadcastJoinMarkCandidateVisitor();
-      markCandidateVisitor.visit(globalPlanContext,
-          masterPlan.getLogicalPlan(), masterPlan.getLogicalPlan().getRootBlock(), inputPlan, new Stack<LogicalNode>());
-
-      BroadcastJoinPlanVisitor broadcastJoinPlanVisitor = new BroadcastJoinPlanVisitor();
-      broadcastJoinPlanVisitor.visit(globalPlanContext,
-          masterPlan.getLogicalPlan(), masterPlan.getLogicalPlan().getRootBlock(), inputPlan, new Stack<LogicalNode>());
-    }
-
     // create a distributed execution plan by visiting each logical node.
     // Its output is a graph, where each vertex is an execution block, and each edge is a data channel.
     // MasterPlan contains them.
@@ -167,9 +152,10 @@ public class GlobalPlanner {
     }
 
     masterPlan.setTerminal(terminalBlock);
-    LOG.info("\n" + masterPlan.toString());
+    LOG.info("\n\nNon-optimized master plan\n" + masterPlan.toString());
 
-    rewriteEngine.rewrite(masterPlan);
+    masterPlan = rewriteEngine.rewrite(queryContext, masterPlan);
+    LOG.info("\n\nOptimized master plan\n" + masterPlan.toString());
   }
 
   private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) {
@@ -209,187 +195,12 @@ public class GlobalPlanner {
     return channel;
   }
 
-  /**
-   * It calculates the total volume of all descendent relation nodes.
-   */
-  public static long computeDescendentVolume(LogicalNode node) throws PlanningException {
-
-    if (node instanceof RelationNode) {
-      switch (node.getType()) {
-      case SCAN:
-        ScanNode scanNode = (ScanNode) node;
-        if (scanNode.getTableDesc().getStats() == null) {
-          // TODO - this case means that data is not located in HDFS. So, we need additional
-          // broadcast method.
-          return Long.MAX_VALUE;
-        } else {
-          return scanNode.getTableDesc().getStats().getNumBytes();
-        }
-      case PARTITIONS_SCAN:
-        PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node;
-        if (pScanNode.getTableDesc().getStats() == null) {
-          // TODO - this case means that data is not located in HDFS. So, we need additional
-          // broadcast method.
-          return Long.MAX_VALUE;
-        } else {
-          // if there is no selected partition
-          if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
-            return 0;
-          } else {
-            return pScanNode.getTableDesc().getStats().getNumBytes();
-          }
-        }
-      case TABLE_SUBQUERY:
-        return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery());
-      default:
-        throw new IllegalArgumentException("Not RelationNode");
-      }
-    } else if (node instanceof UnaryNode) {
-      return computeDescendentVolume(((UnaryNode) node).getChild());
-    } else if (node instanceof BinaryNode) {
-      BinaryNode binaryNode = (BinaryNode) node;
-      return computeDescendentVolume(binaryNode.getLeftChild()) + computeDescendentVolume(binaryNode.getRightChild());
-    }
-
-    throw new PlanningException("Invalid State");
-  }
-
-  private static boolean checkIfCanBeOneOfBroadcastJoin(LogicalNode node) {
-    return node.getType() == NodeType.SCAN || node.getType() == NodeType.PARTITIONS_SCAN;
-  }
-
-  /**
-   * Get a volume of a table of a partitioned table
-   * @param scanNode ScanNode corresponding to a table
-   * @return table volume (bytes)
-   */
-  private static long getTableVolume(ScanNode scanNode) {
-    long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
-    if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
-      PartitionedTableScanNode pScanNode = (PartitionedTableScanNode)scanNode;
-      if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
-        scanBytes = 0L;
-      }
-    }
-
-    return scanBytes;
-  }
-
   private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
                                         ExecutionBlock leftBlock, ExecutionBlock rightBlock)
       throws PlanningException {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
-    boolean broadcastEnabled = context.getPlan().getContext().getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
-    long broadcastTableSizeLimit = context.getPlan().getContext().getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT);
-
-    // to check when the tajo.dist-query.join.broadcast.auto property is true
-    if (broadcastEnabled && joinNode.isCandidateBroadcast()) {
-      LogicalNode leftNode = joinNode.getLeftChild();
-      LogicalNode rightNode = joinNode.getRightChild();
-
-      List<ScanNode> broadcastTargetScanNodes = new ArrayList<ScanNode>();
-      int numLargeTables = 0;
-      boolean leftBroadcast = false;
-      boolean rightBroadcast = false;
-
-      // TODO - in the the current implementation, a broadcast join on a bush join tree is not supported yet.
-      //
-      //        Join
-      //       /    \
-      //   Join     Join
-      //   /  \     /  \
-      // Scan Scan Scan Scan
-
-
-      // Checking Left Side of Join
-      if (ScanNode.isScanNode(leftNode)) {
-        ScanNode scanNode = (ScanNode)leftNode;
-        if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >= broadcastTableSizeLimit) {
-          numLargeTables++;
-        } else {
-          leftBroadcast = true;
-          broadcastTargetScanNodes.add(scanNode);
-          LOG.info("JoinNode's left table " + scanNode.getCanonicalName() + " ("
-              + getTableVolume(scanNode) + ") is marked a broadcasted table");
-        }
-      }
-
-      // Checking Right Side OF Join
-      if (ScanNode.isScanNode(rightNode)) {
-        ScanNode scanNode = (ScanNode)rightNode;
-        if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >= broadcastTableSizeLimit) {
-          numLargeTables++;
-        } else {
-          rightBroadcast = true;
-          broadcastTargetScanNodes.add(scanNode);
-          LOG.info("JoinNode's right table " + scanNode.getCanonicalName() + " ("
-              + getTableVolume(scanNode) + ") is marked a broadcasted table");
-        }
-      }
-
-      JoinNode blockJoinNode = null;
-      if (!leftBroadcast && !rightBroadcast) {
-        // In the case of large, large, small, small
-        // all small tables broadcast to right large table
-        numLargeTables = 1;
-      }
-      for(LogicalNode eachNode: joinNode.getBroadcastCandidateTargets()) {
-        if (eachNode.getPID() == joinNode.getPID()) {
-          continue;
-        }
-        if (numLargeTables >= 2) {
-          break;
-        }
-        JoinNode broadcastJoinNode = (JoinNode)eachNode;
-        ScanNode scanNode = broadcastJoinNode.getRightChild();
-        if (getTableVolume(scanNode) < broadcastTableSizeLimit) {
-          broadcastTargetScanNodes.add(scanNode);
-          blockJoinNode = broadcastJoinNode;
-          LOG.info("The table " + scanNode.getCanonicalName() + " ("
-              + getTableVolume(scanNode) + ") is marked a broadcasted table");
-        } else {
-          numLargeTables++;
-          if (numLargeTables < 2) {
-            blockJoinNode = broadcastJoinNode;
-          }
-        }
-      }
-
-      if (!broadcastTargetScanNodes.isEmpty()) {
-        // make new execution block
-        currentBlock = masterPlan.newExecutionBlock();
-
-        if (!leftBroadcast && !rightBroadcast) {
-          DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
-          ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
-          joinNode.setLeftChild(leftScan);
-          masterPlan.addConnect(leftChannel);
-
-          DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
-          ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
-          joinNode.setRightChild(rightScan);
-          masterPlan.addConnect(rightChannel);
-        }
-
-        if (blockJoinNode != null) {
-          LOG.info("Set execution's plan with join " + blockJoinNode + " for broadcast join");
-          // set current execution block's plan with last broadcast join node
-          currentBlock.setPlan(blockJoinNode);
-        } else {
-          currentBlock.setPlan(joinNode);
-        }
-
-        for (ScanNode eachBroadcastTarget: broadcastTargetScanNodes) {
-          currentBlock.addBroadcastTable(eachBroadcastTarget.getCanonicalName());
-          context.execBlockMap.remove(eachBroadcastTarget.getPID());
-        }
-
-        return currentBlock;
-      }
-    }
-
     LogicalNode leftNode = joinNode.getLeftChild();
     LogicalNode rightNode = joinNode.getRightChild();
 
@@ -961,7 +772,7 @@ public class GlobalPlanner {
         firstPhaseEvalNames[i] = plan.generateUniqueColumnName(firstPhaseEvals[i]);
         FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
 
-        secondPhaseEvals[i].setFinalPhase();
+        secondPhaseEvals[i].setLastPhase();
         secondPhaseEvals[i].setArgs(new EvalNode[]{param});
       }
 
@@ -1355,33 +1166,20 @@ public class GlobalPlanner {
       LogicalNode leftChild = visit(context, plan, block, node.getLeftChild(), stack);
       ExecutionBlock leftChildBlock = context.execBlockMap.get(leftChild.getPID());
 
-      if (leftChild.getType() == NodeType.JOIN && checkIfCanBeOneOfBroadcastJoin(node.getRightChild())) {
-        ScanNode scanNode = node.getRightChild();
-        if (leftChildBlock.isBroadcastTable(scanNode.getCanonicalName())) {
-          context.execBlockMap.put(node.getPID(), leftChildBlock);
-          return node;
-        }
-
-        // if left execution block's plan is replaced with parent node(join node)
-        if (leftChildBlock.getPlan().getPID() == node.getPID()) {
-          context.execBlockMap.put(node.getPID(), leftChildBlock);
-          return node;
-        }
-      }
-
       LogicalNode rightChild = visit(context, plan, block, node.getRightChild(), stack);
       ExecutionBlock rightChildBlock = context.execBlockMap.get(rightChild.getPID());
 
-      // In the case of broadcast join leftChildBlock can be replaced with upper join node.
-      // So if the current join node is a child node of leftChildBlock's plan(join node)
-      // the current join node already participates in broadcast join.
-      LogicalNode leftChildBlockNode = leftChildBlock.getPlan();
-      // If child block is union, child block has not plan
-      if (leftChildBlockNode != null && leftChildBlockNode.getType() == NodeType.JOIN) {
-        if (leftChildBlockNode.getPID() > node.getPID()) {
-          context.execBlockMap.put(node.getPID(), leftChildBlock);
-          return node;
-        }
+      if (node.getJoinType() == JoinType.LEFT_OUTER) {
+        leftChildBlock.setPreservedRow();
+        rightChildBlock.setNullSuppllying();
+      } else if (node.getJoinType() == JoinType.RIGHT_OUTER) {
+        leftChildBlock.setNullSuppllying();
+        rightChildBlock.setPreservedRow();
+      } else if (node.getJoinType() == JoinType.FULL_OUTER) {
+        leftChildBlock.setPreservedRow();
+        leftChildBlock.setNullSuppllying();
+        rightChildBlock.setPreservedRow();
+        rightChildBlock.setNullSuppllying();
       }
 
       ExecutionBlock newExecBlock = buildJoinPlan(context, node, leftChildBlock, rightChildBlock);
@@ -1400,7 +1198,7 @@ public class GlobalPlanner {
       LogicalPlan.QueryBlock rightQueryBlock = plan.getBlock(node.getRightChild());
       LogicalNode rightChild = visit(context, plan, rightQueryBlock, rightQueryBlock.getRoot(), stack);
       stack.pop();
-      
+
       MasterPlan masterPlan = context.getPlan();
 
       List<ExecutionBlock> unionBlocks = Lists.newArrayList();
@@ -1611,29 +1409,4 @@ public class GlobalPlanner {
       return node;
     }
   }
-
-  @SuppressWarnings("unused")
-  private static class ConsecutiveUnionFinder extends BasicLogicalPlanVisitor<List<UnionNode>, LogicalNode> {
-    @Override
-    public LogicalNode visitUnion(List<UnionNode> unionNodeList, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock,
-                                  UnionNode node, Stack<LogicalNode> stack)
-        throws PlanningException {
-      if (node.getType() == NodeType.UNION) {
-        unionNodeList.add(node);
-      }
-
-      stack.push(node);
-      TableSubQueryNode leftSubQuery = node.getLeftChild();
-      TableSubQueryNode rightSubQuery = node.getRightChild();
-      if (leftSubQuery.getSubQuery().getType() == NodeType.UNION) {
-        visit(unionNodeList, plan, queryBlock, leftSubQuery, stack);
-      }
-      if (rightSubQuery.getSubQuery().getType() == NodeType.UNION) {
-        visit(unionNodeList, plan, queryBlock, rightSubQuery, stack);
-      }
-      stack.pop();
-
-      return node;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 3cca4f2..6e9b74f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -26,6 +26,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.util.graph.DirectedGraphVisitor;
 import org.apache.tajo.util.graph.SimpleDirectedGraph;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -108,6 +109,20 @@ public class MasterPlan {
     return execBlockMap.get(execBlockId);
   }
 
+  public void removeExecBlock(ExecutionBlockId execBlockId) throws IllegalStateException {
+    List<DataChannel> channels = getIncomingChannels(execBlockId);
+    if (channels != null && channels.size() > 0) {
+      throw new IllegalStateException("Cannot remove execution blocks because some other execution blocks are connected");
+    }
+
+    channels = getOutgoingChannels(execBlockId);
+    if (channels != null && channels.size() > 0) {
+      throw new IllegalStateException("Cannot remove execution blocks because some other execution blocks are connected");
+    }
+
+    execBlockMap.remove(execBlockId);
+  }
+
   public void addConnect(DataChannel dataChannel) {
     execBlockGraph.addEdge(dataChannel.getSrcId(), dataChannel.getTargetId(), dataChannel);
   }
@@ -204,6 +219,10 @@ public class MasterPlan {
     return getChild(executionBlock.getId(), idx);
   }
 
+  public void accept(ExecutionBlockId v, DirectedGraphVisitor<ExecutionBlockId> visitor) {
+    execBlockGraph.accept(v, visitor);
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 13ed99b..8095458 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -250,11 +250,6 @@ public class DistinctGroupbyBuilder {
     for (DistinctGroupbyNodeBuildInfo buildInfo: distinctNodeBuildInfos.values()) {
       GroupbyNode eachGroupbyNode = buildInfo.getGroupbyNode();
       List<AggregationFunctionCallEval> groupbyAggFunctions = buildInfo.getAggFunctions();
-      String [] firstPhaseEvalNames = new String[groupbyAggFunctions.size()];
-      int index = 0;
-      for (AggregationFunctionCallEval eachCallEval: groupbyAggFunctions) {
-        firstPhaseEvalNames[index++] = eachCallEval.getName();
-      }
 
       Target[] targets = new Target[eachGroupbyNode.getGroupingColumns().length + groupbyAggFunctions.size()];
       int targetIdx = 0;
@@ -411,7 +406,7 @@ public class DistinctGroupbyBuilder {
         buildInfo.addAggFunction(aggFunction);
         buildInfo.addAggFunctionTarget(aggFunctionTarget);
       } else {
-        aggFunction.setFinalPhase();
+        aggFunction.setLastPhase();
         otherAggregationFunctionCallEvals.add(aggFunction);
         otherAggregationFunctionTargets.add(aggFunctionTarget);
       }
@@ -562,6 +557,7 @@ public class DistinctGroupbyBuilder {
         }
 
         for (int aggFuncIdx = 0; aggFuncIdx < secondStageGroupbyNode.getAggFunctions().length; aggFuncIdx++) {
+          secondStageGroupbyNode.getAggFunctions()[aggFuncIdx].setLastPhase();
           int targetIdx = originGroupColumns.size() + uniqueDistinctColumn.size() + aggFuncIdx;
           Target aggFuncTarget = oldTargets[targetIdx];
           secondGroupbyTargets.add(aggFuncTarget);
@@ -591,6 +587,7 @@ public class DistinctGroupbyBuilder {
 
           AggregationFunctionCallEval secondStageAggFunction = secondStageGroupbyNode.getAggFunctions()[aggFuncIdx];
           secondStageAggFunction.setArgs(new EvalNode[] {firstEval});
+          secondStageAggFunction.setLastPhase();
 
           Target secondTarget = secondStageGroupbyNode.getTargets()[secondStageGroupbyNode.getGroupingColumns().length + aggFuncIdx];
           Column column = secondTarget.getNamedColumn();

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java
index 96ee2c6..1ae4056 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java
@@ -18,7 +18,9 @@
 
 package org.apache.tajo.engine.planner.global.rewriter;
 
+import com.google.common.collect.Lists;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.rewriter.rules.BroadcastJoinRule;
 import org.apache.tajo.util.TUtil;
 
 import java.util.Collection;
@@ -34,6 +36,8 @@ public class BaseGlobalPlanRewriteRuleProvider extends GlobalPlanRewriteRuleProv
 
   @Override
   public Collection<Class<? extends GlobalPlanRewriteRule>> getRules() {
-    return EMPTY_RULES;
+    List<Class<? extends GlobalPlanRewriteRule>> rules = Lists.newArrayList();
+    rules.add(BroadcastJoinRule.class);
+    return rules;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java
index c01ed0e..cc1f3c6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.global.rewriter;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.OverridableConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.util.ReflectionUtil;
@@ -67,11 +68,11 @@ public class GlobalPlanRewriteEngine {
    * @param plan The plan to be rewritten with all query rewrite rule.
    * @return The rewritten plan.
    */
-  public MasterPlan rewrite(MasterPlan plan) throws PlanningException {
+  public MasterPlan rewrite(OverridableConf queryContext, MasterPlan plan) throws PlanningException {
     GlobalPlanRewriteRule rule;
     for (Map.Entry<String, GlobalPlanRewriteRule> rewriteRule : rewriteRules.entrySet()) {
       rule = rewriteRule.getValue();
-      if (rule.isEligible(plan)) {
+      if (rule.isEligible(queryContext, plan)) {
         plan = rule.rewrite(plan);
         if (LOG.isDebugEnabled()) {
           LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query.");

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java
index 4a37207..f30160f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java
@@ -18,7 +18,9 @@
 
 package org.apache.tajo.engine.planner.global.rewriter;
 
+import org.apache.tajo.OverridableConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.plan.PlanningException;
 
 /**
  * A rewrite rule for global plans
@@ -29,15 +31,16 @@ public interface GlobalPlanRewriteRule {
    * Return rule name
    * @return Rule name
    */
-  public abstract String getName();
+  String getName();
 
   /**
    * Check if this rule should be applied.
    *
+   * @param queryContext Query context
    * @param plan Global Plan
    * @return
    */
-  public abstract boolean isEligible(MasterPlan plan);
+  boolean isEligible(OverridableConf queryContext, MasterPlan plan);
 
   /**
    * Rewrite a global plan
@@ -45,5 +48,5 @@ public interface GlobalPlanRewriteRule {
    * @param plan Global Plan
    * @return
    */
-  public abstract MasterPlan rewrite(MasterPlan plan);
+  MasterPlan rewrite(MasterPlan plan) throws PlanningException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
new file mode 100644
index 0000000..85b5e10
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter.rules;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.graph.DirectedGraphVisitor;
+
+import java.util.*;
+
+/**
+ * {@link BroadcastJoinRule} converts repartition join plan into broadcast join plan.
+ * Broadcast join rules can be defined as follows.
+ *
+ * <h3>Broadcastable relation</h3>
+ * A relation is broadcastable when its size is smaller than a given threshold.
+ *
+ * <h3>Assumetion</h3>
+ * If every input of an execution block is broadcastable, the output of the execution block is also broadcastable.
+ *
+ * <h3>Rules to convert repartition join into broadcast join</h3>
+ * <ul>
+ *   <li>Given an EB containing a join and its child EBs, those EBs can be merged into a single EB if at least one child EB's output is broadcastable.</li>
+ *   <li>Given a user-defined threshold, the total size of broadcast relations of an EB cannot exceed such threshold.</li>
+ *   <ul>
+ *     <li>After merging EBs according to the first rule, the result EB may not satisfy the second rule. In this case, enforce repartition join for large relations to satisfy the second rule.</li>
+ *   </ul>
+ *   <li>Preserved-row relations cannot be broadcasted to avoid duplicated results. That is, full outer join cannot be executed with broadcast join.</li>
+ *   <ul>
+ *     <li>Here is brief backgrounds for this rule. Data of preserved-row relations will be appeared in the join result regardless of join conditions. If multiple tasks execute outer join with broadcasted preserved-row relations, they emit duplicates results.</li>
+ *     <li>Even though a single task can execute outer join when every input is broadcastable, broadcast join is not allowed if one of input relation consists of multiple files.</li>
+ *   </ul>
+ * </ul>
+ *
+ */
+public class BroadcastJoinRule implements GlobalPlanRewriteRule {
+
+  private BroadcastJoinPlanBuilder planBuilder;
+  private BroadcastJoinPlanFinalizer planFinalizer;
+
+  @Override
+  public String getName() {
+    return "BroadcastJoinRule";
+  }
+
+  @Override
+  public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
+    if (queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED)) {
+      for (LogicalPlan.QueryBlock block : plan.getLogicalPlan().getQueryBlocks()) {
+        if (block.hasNode(NodeType.JOIN)) {
+          long broadcastSizeThreshold = queryContext.getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT);
+          if (broadcastSizeThreshold > 0) {
+            GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder();
+            RelationSizeComparator relSizeComparator = new RelationSizeComparator();
+            planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, broadcastSizeThreshold);
+            planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator);
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public MasterPlan rewrite(MasterPlan plan) throws PlanningException{
+    plan.accept(plan.getRoot().getId(), planBuilder);
+    plan.accept(plan.getRoot().getId(), planFinalizer);
+    return plan;
+  }
+
+  private static class RelationSizeComparator implements Comparator<ScanNode> {
+
+    @Override
+    public int compare(ScanNode o1, ScanNode o2) {
+      long compare = GlobalPlanRewriteUtil.getTableVolume(o1) - GlobalPlanRewriteUtil.getTableVolume(o2);
+      if (compare == 0) {
+        return 0;
+      } else if (compare > 0) {
+        return 1;
+      } else {
+        return -1;
+      }
+    }
+  }
+
+  /**
+   * If a plan contains only broadcast relations, it will be executed at multiple workers who store any broadcast relations.
+   * {@Link BroadcastJoinPlanFinalizer} checks whether every input is the broadcast candidate or not.
+   * If so, it removes the broadcast property from the largest relation.
+   */
+  private static class BroadcastJoinPlanFinalizer implements DirectedGraphVisitor<ExecutionBlockId> {
+    private final MasterPlan plan;
+    private final RelationSizeComparator relSizeComparator;
+
+    public BroadcastJoinPlanFinalizer(MasterPlan plan, RelationSizeComparator relationSizeComparator) {
+      this.plan = plan;
+      this.relSizeComparator = relationSizeComparator;
+    }
+
+    @Override
+    public void visit(Stack<ExecutionBlockId> stack, ExecutionBlockId currentId) {
+      ExecutionBlock current = plan.getExecBlock(currentId);
+      if (!plan.isTerminal(current)) {
+        // When every child is a broadcast candidate, enforce non-broadcast for the largest relation for the join to be
+        // computed at the node who stores such largest relation.
+        if (isFullyBroadcastable(current)) {
+          List<ScanNode> broadcastCandidates = TUtil.newList(current.getBroadcastRelations());
+          Collections.sort(broadcastCandidates, relSizeComparator);
+
+          current.removeBroadcastRelation(broadcastCandidates.remove(broadcastCandidates.size()-1));
+        }
+      }
+    }
+  }
+
+  private static class BroadcastJoinPlanBuilder implements DirectedGraphVisitor<ExecutionBlockId> {
+    private final MasterPlan plan;
+    private final RelationSizeComparator relSizeComparator;
+    private final long broadcastSizeThreshold;
+    private final GlobalPlanRewriteUtil.ParentFinder parentFinder;
+
+    public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relationSizeComparator,
+                                    GlobalPlanRewriteUtil.ParentFinder parentFinder, long broadcastSizeThreshold) {
+      this.plan = plan;
+      this.relSizeComparator = relationSizeComparator;
+      this.broadcastSizeThreshold = broadcastSizeThreshold;
+      this.parentFinder = parentFinder;
+    }
+
+    @Override
+    public void visit(Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId) {
+      ExecutionBlock current = plan.getExecBlock(executionBlockId);
+
+      if (plan.isLeaf(current)) {
+        visitLeafNode(current);
+      } else {
+        visitNonLeafNode(current);
+      }
+    }
+
+    private void visitLeafNode(ExecutionBlock current) {
+      // At leaf execution blocks, find input relations who's size is smaller than the predefined threshold.
+      if (!current.isPreservedRow()) {
+        // Preserved-row relations must not be broadcasted to avoid data duplication.
+        boolean fullyBroadcastable = true;
+        for (ScanNode scanNode : current.getScanNodes()) {
+          if (GlobalPlanRewriteUtil.getTableVolume(scanNode) <= broadcastSizeThreshold) {
+            current.addBroadcastRelation(scanNode);
+          } else {
+            fullyBroadcastable = false;
+          }
+        }
+        if (fullyBroadcastable && current.getScanNodes().length == 1) {
+          try {
+            updateScanOfParentAsBroadcastable(plan, current);
+          } catch (PlanningException e) {
+            // This case is when the current has two or more inputs via union, and simply ignored.
+          }
+        }
+      }
+    }
+
+    private void visitNonLeafNode(ExecutionBlock current) {
+      // At non-leaf execution blocks, merge broadcastable children's plan with the current plan.
+
+      if (!plan.isTerminal(current)) {
+        if (current.hasJoin()) {
+          List<ExecutionBlock> childs = plan.getChilds(current);
+          Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = current.getUnionScanMap();
+
+          if (current.hasBroadcastRelation()) {
+            // The current execution block and its every child are able to be merged.
+            for (ExecutionBlock child : childs) {
+              try {
+                addUnionNodeIfNecessary(unionScanMap, plan, child, current);
+                mergeTwoPhaseJoin(plan, child, current);
+              } catch (PlanningException e) {
+                throw new RuntimeException(e);
+              }
+            }
+
+            checkTotalSizeOfBroadcastableRelations(current);
+
+            // We assume that if every input of an execution block is broadcastable,
+            // the output of the execution block is also broadcastable.
+            if (!current.isPreservedRow() && isFullyBroadcastable(current)) {
+              try {
+                updateScanOfParentAsBroadcastable(plan, current);
+              } catch (PlanningException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+        } else {
+          List<ScanNode> relations = TUtil.newList(current.getBroadcastRelations());
+          for (ScanNode eachRelation : relations) {
+            current.removeBroadcastRelation(eachRelation);
+          }
+        }
+      }
+    }
+
+    /**
+     * When the total size of broadcastable relations exceeds the threshold, enforce repartition join for large ones
+     * in order to broadcast as many relations as possible.
+     *
+     * @param block
+     */
+    private void checkTotalSizeOfBroadcastableRelations(ExecutionBlock block) {
+      List<ScanNode> broadcastCandidates = TUtil.newList();
+      for (ScanNode scanNode : block.getScanNodes()) {
+        long estimatedRelationSize = GlobalPlanRewriteUtil.getTableVolume(scanNode);
+        if (estimatedRelationSize > 0 && estimatedRelationSize <= broadcastSizeThreshold) {
+          broadcastCandidates.add(scanNode);
+        }
+      }
+      Collections.sort(broadcastCandidates, relSizeComparator);
+
+      // Enforce broadcast for candidates in ascending order of relation size
+      long totalBroadcastVolume = 0;
+      int i;
+      for (i = 0; i < broadcastCandidates.size(); i++) {
+        long volumeOfCandidate = GlobalPlanRewriteUtil.getTableVolume(broadcastCandidates.get(i));
+        if (totalBroadcastVolume + volumeOfCandidate > broadcastSizeThreshold) {
+          break;
+        }
+        totalBroadcastVolume += volumeOfCandidate;
+      }
+
+      for (; i < broadcastCandidates.size(); ) {
+        ScanNode nonBroadcast = broadcastCandidates.remove(i);
+        block.removeBroadcastRelation(nonBroadcast);
+      }
+    }
+
+    private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current) throws PlanningException {
+      ExecutionBlock parent = plan.getParent(current);
+      if (parent != null && !plan.isTerminal(parent)) {
+        ScanNode scanForCurrent = GlobalPlanRewriteUtil.findScanForChildEb(current, parent);
+        parent.addBroadcastRelation(scanForCurrent);
+      }
+    }
+
+    /**
+     * Merge child execution blocks.
+     *
+     * @param plan master plan
+     * @param child child block
+     * @param parent parent block who has join nodes
+     * @return
+     */
+    private ExecutionBlock mergeTwoPhaseJoin(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent)
+        throws PlanningException {
+      ScanNode scanForChild = GlobalPlanRewriteUtil.findScanForChildEb(child, parent);
+
+      parentFinder.set(scanForChild);
+      parentFinder.find(parent.getPlan());
+      LogicalNode parentOfScanForChild = parentFinder.getFound();
+
+      LogicalNode rootOfChild = child.getPlan();
+      if (rootOfChild.getType() == NodeType.STORE) {
+        rootOfChild = ((StoreTableNode)rootOfChild).getChild();
+      }
+
+      GlobalPlanRewriteUtil.replaceChild(rootOfChild, scanForChild, parentOfScanForChild);
+
+      parent = GlobalPlanRewriteUtil.mergeExecutionBlocks(plan, child, parent);
+      parent.removeBroadcastRelation(scanForChild);
+
+      parent.setPlan(parent.getPlan());
+
+      return parent;
+    }
+
+    private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> unionScanMap, MasterPlan plan,
+                                         ExecutionBlock child, ExecutionBlock current)
+        throws PlanningException {
+      if (unionScanMap != null) {
+        List<ExecutionBlockId> unionScans = TUtil.newList();
+        ExecutionBlockId representativeId = null;
+        if (unionScanMap.containsKey(child.getId())) {
+          representativeId = unionScanMap.get(child.getId());
+        } else if (unionScanMap.containsValue(child.getId())) {
+          representativeId = child.getId();
+        }
+
+        if (representativeId != null) {
+          for (Map.Entry<ExecutionBlockId, ExecutionBlockId> entry : unionScanMap.entrySet()) {
+            if (entry.getValue().equals(representativeId)) {
+              unionScans.add(entry.getKey());
+            }
+          }
+
+          // add unions
+          LogicalNode left, topUnion = null;
+          left = GlobalPlanner.buildInputExecutor(plan.getLogicalPlan(), plan.getChannel(unionScans.get(0), current.getId()));
+          for (int i = 1; i < unionScans.size(); i++) {
+            // left must not be null
+            UnionNode unionNode = plan.getLogicalPlan().createNode(UnionNode.class);
+            unionNode.setLeftChild(left);
+            unionNode.setRightChild(GlobalPlanner.buildInputExecutor(plan.getLogicalPlan(), plan.getChannel(unionScans.get(i), current.getId())));
+            unionNode.setInSchema(left.getOutSchema());
+            unionNode.setOutSchema(left.getOutSchema());
+            topUnion = unionNode;
+            left = unionNode;
+          }
+
+          ScanNode scanForChild = GlobalPlanRewriteUtil.findScanForChildEb(plan.getExecBlock(representativeId), current);
+          PlannerUtil.replaceNode(plan.getLogicalPlan(), current.getPlan(), scanForChild, topUnion);
+
+          current.getUnionScanMap().clear();
+          current.setPlan(current.getPlan());
+        }
+      }
+    }
+  }
+
+  private static boolean isFullyBroadcastable(ExecutionBlock block) {
+    return block.getBroadcastRelations().size() == block.getScanNodes().length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
index e55a258..9148382 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.global.rewriter.rules;
 
+import org.apache.tajo.OverridableConf;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.MasterPlan;
@@ -38,7 +39,7 @@ public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(MasterPlan plan) {
+  public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
new file mode 100644
index 0000000..cc98300
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter.rules;
+
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.*;
+
+import java.util.List;
+
+public class GlobalPlanRewriteUtil {
+  /**
+   * Merge the parent EB with the child EB.
+   *
+   * @param plan
+   * @param child
+   * @param parent
+   * @return
+   */
+  public static ExecutionBlock mergeExecutionBlocks(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) {
+    for (ScanNode broadcastable : child.getBroadcastRelations()) {
+      parent.addBroadcastRelation(broadcastable);
+    }
+
+    // connect parent and grand children
+    List<ExecutionBlock> grandChilds = plan.getChilds(child);
+    for (ExecutionBlock eachGrandChild : grandChilds) {
+      DataChannel originalChannel = plan.getChannel(eachGrandChild, child);
+      DataChannel newChannel = new DataChannel(eachGrandChild, parent, originalChannel.getShuffleType(),
+          originalChannel.getShuffleOutputNum());
+      newChannel.setSchema(originalChannel.getSchema());
+      newChannel.setShuffleKeys(originalChannel.getShuffleKeys());
+      newChannel.setStoreType(originalChannel.getStoreType());
+      newChannel.setTransmitType(originalChannel.getTransmitType());
+      plan.addConnect(newChannel);
+      plan.disconnect(eachGrandChild, child);
+    }
+
+    plan.disconnect(child, parent);
+    List<DataChannel> channels = plan.getIncomingChannels(child.getId());
+    if (channels == null || channels.size() == 0) {
+      channels = plan.getOutgoingChannels(child.getId());
+      if (channels == null || channels.size() == 0) {
+        plan.removeExecBlock(child.getId());
+      }
+    }
+    return parent;
+  }
+
+  /**
+   * Replace a child of the given parent logical node with the new one.
+   *
+   * @param newChild
+   * @param originalChild
+   * @param parent
+   * @throws PlanningException
+   */
+  public static void replaceChild(LogicalNode newChild, ScanNode originalChild, LogicalNode parent)
+      throws PlanningException {
+    if (parent instanceof UnaryNode) {
+      ((UnaryNode) parent).setChild(newChild);
+    } else if (parent instanceof BinaryNode) {
+      BinaryNode binary = (BinaryNode) parent;
+      if (binary.getLeftChild().equals(originalChild)) {
+        binary.setLeftChild(newChild);
+      } else if (binary.getRightChild().equals(originalChild)) {
+        binary.setRightChild(newChild);
+      } else {
+        throw new PlanningException(originalChild.getPID() + " is not a child of " + parent.getPID());
+      }
+    } else {
+      throw new PlanningException(parent.getPID() + " seems to not have any children");
+    }
+  }
+
+  /**
+   * Find a scan node in the plan of the parent EB corresponding to the output of the child EB.
+   *
+   * @param child
+   * @param parent
+   * @return
+   * @throws PlanningException
+   */
+  public static ScanNode findScanForChildEb(ExecutionBlock child, ExecutionBlock parent) throws PlanningException {
+    ScanNode scanForChild = null;
+    for (ScanNode scanNode : parent.getScanNodes()) {
+      if (scanNode.getTableName().equals(child.getId().toString())) {
+        scanForChild = scanNode;
+        break;
+      }
+    }
+    if (scanForChild == null) {
+      throw new PlanningException("Cannot find any scan nodes for " + child.getId() + " in " + parent.getId());
+    }
+    return scanForChild;
+  }
+
+  /**
+   * Get a volume of a table of a partitioned table
+   * @param scanNode ScanNode corresponding to a table
+   * @return table volume (bytes)
+   */
+  public static long getTableVolume(ScanNode scanNode) {
+    if (scanNode.getTableDesc().hasStats()) {
+      long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
+      if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
+        PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode;
+        if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
+          scanBytes = 0L;
+        }
+      }
+
+      return scanBytes;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * It calculates the total volume of all descendent relation nodes.
+   */
+  public static long computeDescendentVolume(LogicalNode node) throws PlanningException {
+
+    if (node instanceof RelationNode) {
+      switch (node.getType()) {
+        case SCAN:
+          ScanNode scanNode = (ScanNode) node;
+          if (scanNode.getTableDesc().getStats() == null) {
+            // TODO - this case means that data is not located in HDFS. So, we need additional
+            // broadcast method.
+            return Long.MAX_VALUE;
+          } else {
+            return scanNode.getTableDesc().getStats().getNumBytes();
+          }
+        case PARTITIONS_SCAN:
+          PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node;
+          if (pScanNode.getTableDesc().getStats() == null) {
+            // TODO - this case means that data is not located in HDFS. So, we need additional
+            // broadcast method.
+            return Long.MAX_VALUE;
+          } else {
+            // if there is no selected partition
+            if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
+              return 0;
+            } else {
+              return pScanNode.getTableDesc().getStats().getNumBytes();
+            }
+          }
+        case TABLE_SUBQUERY:
+          return computeDescendentVolume(((TableSubQueryNode) node).getSubQuery());
+        default:
+          throw new IllegalArgumentException("Not RelationNode");
+      }
+    } else if (node instanceof UnaryNode) {
+      return computeDescendentVolume(((UnaryNode) node).getChild());
+    } else if (node instanceof BinaryNode) {
+      BinaryNode binaryNode = (BinaryNode) node;
+      return computeDescendentVolume(binaryNode.getLeftChild()) + computeDescendentVolume(binaryNode.getRightChild());
+    }
+
+    throw new PlanningException("Invalid State");
+  }
+
+  public static class ParentFinder implements LogicalNodeVisitor {
+    private LogicalNode target;
+    private LogicalNode found;
+
+    public void set(LogicalNode child) {
+      this.target = child;
+      this.found = null;
+    }
+
+    public void find(LogicalNode root) {
+      this.visit(root);
+    }
+
+    public LogicalNode getFound() throws PlanningException {
+      if (found == null) {
+        throw new PlanningException("Cannot find the parent of " + target.getPID());
+      }
+      return this.found;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      for (int i = 0; i < node.childNum(); i++) {
+        if (node.getChild(i).equals(target)) {
+          found = node;
+          break;
+        } else {
+          if (found == null) {
+            visit(node.getChild(i));
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
index ff9b253..588f0fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
@@ -125,6 +125,19 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
         keyTuple.put(i, tuple.get(rightKeyList[i]));
       }
 
+      /*
+       * TODO
+       * Currently, some physical executors can return new instances of tuple, but others not.
+       * This sometimes causes wrong results due to the singleton Tuple instance.
+       * The below line is a temporal solution to fix this problem.
+       * This will be improved at https://issues.apache.org/jira/browse/TAJO-1343.
+       */
+      try {
+        tuple = tuple.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e);
+      }
+
       List<Tuple> newValue = map.get(keyTuple);
       if (newValue == null) {
         map.put(keyTuple, newValue = new ArrayList<Tuple>());

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
index 7c38d36..267bd90 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -253,7 +253,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
       if (aggrFunctions != null) {
         for (AggregationFunctionCallEval eachFunction: aggrFunctions) {
           eachFunction.bind(context.getEvalContext(), inSchema);
-          eachFunction.setFinalPhase();
+          eachFunction.setLastPhase();
         }
       }
       newFunctionContext();

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 6c1399e..45b23f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -98,7 +98,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
     MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
     GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
     try {
-      globalPlanner.build(masterPlan);
+      globalPlanner.build(queryContext, masterPlan);
     } catch (PlanningException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 6ab096f..22b8c4c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -496,7 +496,7 @@ public class QueryExecutor {
     }
 
     MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);
-    planner.build(masterPlan);
+    planner.build(context, masterPlan);
 
     return masterPlan;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index 351856f..939de60 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -865,7 +865,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       }
 
       ExecutionBlock parent = masterPlan.getParent(block);
-      if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+      if (masterPlan.isRoot(parent) && parent.isUnionOnly()) {
         return false;
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8fd9ae72/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index a48f0a0..2809a70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -363,7 +363,7 @@ public class QueryMasterTask extends CompositeService {
         }
       }
       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
-      queryMasterContext.getGlobalPlanner().build(masterPlan);
+      queryMasterContext.getGlobalPlanner().build(queryContext, masterPlan);
 
       query = new Query(queryTaskContext, queryId, querySubmitTime,
           "", queryTaskContext.getEventHandler(), masterPlan);