You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/03/04 08:12:48 UTC

tajo git commit: TAJO-2082: Aggregation on a derived table which includes union can cause incorrect result.

Repository: tajo
Updated Branches:
  refs/heads/master fdb76ed2c -> 7b0af7448


TAJO-2082: Aggregation on a derived table which includes union can cause incorrect result.

Closes #969


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7b0af744
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7b0af744
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7b0af744

Branch: refs/heads/master
Commit: 7b0af74483521615f302d2a3376556dad325297f
Parents: fdb76ed
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Mar 4 16:12:16 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Mar 4 16:12:16 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../TooLargeInputForCrossJoinException.java     |   2 +-
 .../apache/tajo/util/graph/DirectedGraph.java   |   3 +-
 .../tajo/util/graph/DirectedGraphVisitor.java   |   4 +-
 .../tajo/util/graph/SimpleDirectedGraph.java    |  13 +-
 .../util/graph/TestSimpleDirectedGraph.java     |   3 +-
 .../tajo/engine/planner/global/DataChannel.java |   8 +
 .../engine/planner/global/ExecutionBlock.java   | 161 +++++++++-----
 .../engine/planner/global/GlobalPlanner.java    |  21 +-
 .../tajo/engine/planner/global/MasterPlan.java  |  51 ++++-
 .../global/builder/DistinctGroupbyBuilder.java  |   3 +-
 .../rewriter/rules/BroadcastJoinRule.java       |   9 +-
 .../planner/physical/ExternalSortExec.java      |   2 +-
 .../apache/tajo/querymaster/Repartitioner.java  |  29 ++-
 .../java/org/apache/tajo/querymaster/Stage.java | 222 +++++++++++--------
 .../plan/rewrite/SelfDescSchemaBuildPhase.java  |   2 +-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |   2 +-
 17 files changed, 343 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7892aa2..592bfe3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,9 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2082: Aggregation on a derived table which includes union can cause 
+    incorrect result. (jihoon)
+
     TAJO-2081: Incorrect task locality on single node. (jinho)
 
     TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
index 55d5f46..d958cc6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
@@ -33,6 +33,6 @@ public class TooLargeInputForCrossJoinException extends TajoException {
   }
 
   public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold) {
-    super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " MB");
+    super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " KB");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
index d8d5ced..ae61ed8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.util.graph;
 
 import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.exception.TajoException;
 
 import java.util.List;
 
@@ -60,5 +61,5 @@ public interface DirectedGraph<V, E> extends Graph<V, E> {
   /**
    * It visits all vertices in a post-order traverse way.
    */
-  <CONTEXT> void accept(CONTEXT context, V src, DirectedGraphVisitor<CONTEXT, V> visitor);
+  <CONTEXT> void accept(CONTEXT context, V src, DirectedGraphVisitor<CONTEXT, V> visitor) throws TajoException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
index 8e0ce87..86f1856 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
@@ -18,8 +18,10 @@
 
 package org.apache.tajo.util.graph;
 
+import org.apache.tajo.exception.TajoException;
+
 import java.util.Stack;
 
 public interface DirectedGraphVisitor<CONTEXT, V> {
-  void visit(CONTEXT context, Stack<V> stack, V v);
+  void visit(CONTEXT context, Stack<V> stack, V v) throws TajoException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
index b5e36e7..e1ba137 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
@@ -21,6 +21,8 @@ package org.apache.tajo.util.graph;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;
@@ -219,13 +221,14 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
   }
 
   @Override
-  public <CONTEXT> void accept(CONTEXT context, V source, DirectedGraphVisitor<CONTEXT, V> visitor) {
+  public <CONTEXT> void accept(CONTEXT context, V source, DirectedGraphVisitor<CONTEXT, V> visitor)
+      throws TajoException {
     Stack<V> stack = new Stack<>();
     visitRecursive(context, stack, source, visitor);
   }
 
   private <CONTEXT> void visitRecursive(CONTEXT context, Stack<V> stack, V current,
-                                        DirectedGraphVisitor<CONTEXT, V> visitor) {
+                                        DirectedGraphVisitor<CONTEXT, V> visitor) throws TajoException {
     stack.push(current);
     for (V child : getChilds(current)) {
       visitRecursive(context, stack, child, visitor);
@@ -249,7 +252,11 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
   public String toStringGraph(V vertex) {
     StringBuilder sb = new StringBuilder();
     QueryGraphTopologyStringBuilder visitor = new QueryGraphTopologyStringBuilder();
-    accept(null, vertex, visitor);
+    try {
+      accept(null, vertex, visitor);
+    } catch (TajoException e) {
+      throw new TajoRuntimeException(e);
+    }
     Stack<DepthString> depthStrings = visitor.getDepthStrings();
     while(!depthStrings.isEmpty()) {
       sb.append(printDepthString(depthStrings.pop()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
index 45cde2a..9ebb69b 100644
--- a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
+++ b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
@@ -20,6 +20,7 @@ package org.apache.tajo.util.graph;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.util.graph.DirectedGraphVisitor;
 import org.apache.tajo.util.graph.SimpleDirectedGraph;
 import org.junit.Test;
@@ -34,7 +35,7 @@ public class TestSimpleDirectedGraph {
   private static final Log LOG = LogFactory.getLog(TestSimpleDirectedGraph.class);
 
   @Test
-  public final void test() {
+  public final void test() throws TajoException {
     SimpleDirectedGraph<String, Integer> graph = new SimpleDirectedGraph<>();
 
     //     root

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index c779d2f..10e9973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -99,6 +99,14 @@ public class DataChannel {
     return shuffleType;
   }
 
+  public boolean isHashShuffle() {
+    return shuffleType == ShuffleType.HASH_SHUFFLE || shuffleType == ShuffleType.SCATTERED_HASH_SHUFFLE;
+  }
+
+  public boolean isRangeShuffle() {
+    return shuffleType == ShuffleType.RANGE_SHUFFLE;
+  }
+
   public boolean needShuffle() {
     return shuffleType != ShuffleType.NONE_SHUFFLE;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 573f5aa..fde05c5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -16,7 +16,10 @@ package org.apache.tajo.engine.planner.global;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 
 import java.util.*;
 
@@ -30,19 +33,14 @@ import java.util.*;
 public class ExecutionBlock {
   private ExecutionBlockId executionBlockId;
   private LogicalNode plan = null;
-  private StoreTableNode store = null;
-  private List<ScanNode> scanlist = new ArrayList<>();
   private Enforcer enforcer = new Enforcer();
-
   // Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId.
   private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<>();
 
-  private boolean hasJoinPlan;
-  private boolean hasUnionPlan;
-  private boolean isUnionOnly;
-
   private Map<String, ScanNode> broadcastRelations = new HashMap<>();
 
+  private PlanContext planContext;
+
   /*
    * An execution block is null-supplying or preserved-row when its output is used as an input for outer join.
    * These properties are decided based on the type of parent execution block's outer join.
@@ -98,52 +96,16 @@ public class ExecutionBlock {
     return executionBlockId;
   }
 
-  public void setPlan(LogicalNode plan) {
-    hasJoinPlan = false;
-    hasUnionPlan = false;
-    isUnionOnly = true;
-    this.scanlist.clear();
+  public void setPlan(LogicalNode plan) throws TajoException {
     this.plan = plan;
 
     if (plan == null) {
       return;
     }
 
-    LogicalNode node = plan;
-    ArrayList<LogicalNode> s = new ArrayList<>();
-    s.add(node);
-    while (!s.isEmpty()) {
-      node = s.remove(s.size()-1);
-      // TODO: the below code should be improved to handle every case
-      if (isUnionOnly && node.getType() != NodeType.ROOT && node.getType() != NodeType.TABLE_SUBQUERY &&
-          node.getType() != NodeType.SCAN && node.getType() != NodeType.PARTITIONS_SCAN &&
-          node.getType() != NodeType.UNION && node.getType() != NodeType.PROJECTION) {
-        isUnionOnly = false;
-      }
-      if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) node;
-        s.add(s.size(), unary.getChild());
-      } else if (node instanceof BinaryNode) {
-        BinaryNode binary = (BinaryNode) node;
-        if (binary.getType() == NodeType.JOIN) {
-          hasJoinPlan = true;
-        } else if (binary.getType() == NodeType.UNION) {
-          hasUnionPlan = true;
-        }
-        s.add(s.size(), binary.getLeftChild());
-        s.add(s.size(), binary.getRightChild());
-      } else if (node instanceof ScanNode) {
-        scanlist.add((ScanNode)node);
-      } else if (node instanceof TableSubQueryNode) {
-        TableSubQueryNode subQuery = (TableSubQueryNode) node;
-        s.add(s.size(), subQuery.getSubQuery());
-      } else if (node instanceof StoreTableNode) {
-        store = (StoreTableNode)node;
-      }
-    }
-    if (!hasUnionPlan) {
-      isUnionOnly = false;
-    }
+    final PlanVisitor visitor = new PlanVisitor();
+    planContext = new PlanContext();
+    visitor.visit(planContext, null, null, plan, new Stack<>());
   }
 
   public void addUnionScan(ExecutionBlockId realScanEbId, ExecutionBlockId delegatedScanEbId) {
@@ -163,12 +125,12 @@ public class ExecutionBlock {
   }
 
   public StoreTableNode getStoreTableNode() {
-    return store;
+    return planContext.store;
   }
 
   public int getNonBroadcastRelNum() {
     int nonBroadcastRelNum = 0;
-    for (ScanNode scanNode : scanlist) {
+    for (ScanNode scanNode : planContext.scanlist) {
       if (!broadcastRelations.containsKey(scanNode.getCanonicalName())) {
         nonBroadcastRelNum++;
       }
@@ -177,19 +139,23 @@ public class ExecutionBlock {
   }
 
   public ScanNode [] getScanNodes() {
-    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
+    return planContext.scanlist.toArray(new ScanNode[planContext.scanlist.size()]);
   }
 
   public boolean hasJoin() {
-    return hasJoinPlan;
+    return planContext.hasJoinPlan;
   }
 
   public boolean hasUnion() {
-    return hasUnionPlan;
+    return planContext.hasUnionPlan;
+  }
+
+  public boolean hasAgg() {
+    return planContext.hasAggPlan;
   }
 
   public boolean isUnionOnly() {
-    return isUnionOnly;
+    return planContext.isUnionOnly();
   }
 
   public void addBroadcastRelation(ScanNode relationNode) {
@@ -235,4 +201,93 @@ public class ExecutionBlock {
   public boolean isPreservedRow() {
     return preservedRow;
   }
+
+  private class PlanContext {
+    StoreTableNode store = null;
+
+    List<ScanNode> scanlist = new ArrayList<>();
+
+    boolean hasJoinPlan = false;
+    boolean hasUnionPlan = false;
+    boolean hasAggPlan = false;
+    boolean hasSortPlan = false;
+
+    boolean isUnionOnly() {
+      return hasUnionPlan && !hasJoinPlan && !hasAggPlan && !hasSortPlan;
+    }
+  }
+
+  private class PlanVisitor extends BasicLogicalPlanVisitor<PlanContext, LogicalNode> {
+
+    @Override
+    public LogicalNode visitJoin(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                                 Stack<LogicalNode> stack) throws TajoException {
+      context.hasJoinPlan = true;
+      return super.visitJoin(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitGroupBy(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
+                                    Stack<LogicalNode> stack) throws TajoException {
+      context.hasAggPlan = true;
+      return super.visitGroupBy(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitWindowAgg(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
+                                      Stack<LogicalNode> stack) throws TajoException {
+      context.hasAggPlan = true;
+      return super.visitWindowAgg(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitDistinctGroupby(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                            DistinctGroupbyNode node, Stack<LogicalNode> stack) throws TajoException {
+      context.hasAggPlan = true;
+      return super.visitDistinctGroupby(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitSort(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, SortNode node,
+                                 Stack<LogicalNode> stack) throws TajoException {
+      context.hasSortPlan = true;
+      return super.visitSort(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitUnion(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node,
+                                  Stack<LogicalNode> stack) throws TajoException {
+      context.hasUnionPlan = true;
+      return super.visitUnion(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitStoreTable(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, StoreTableNode node,
+                                       Stack<LogicalNode> stack) throws TajoException {
+      context.store = node;
+      return super.visitStoreTable(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                                 Stack<LogicalNode> stack) throws TajoException {
+      context.scanlist.add(node);
+      return super.visitScan(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitPartitionedTableScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                                 PartitionedTableScanNode node, Stack<LogicalNode> stack)
+        throws TajoException {
+      context.scanlist.add(node);
+      return super.visitPartitionedTableScan(context, plan, block, node, stack);
+    }
+
+    @Override
+    public LogicalNode visitIndexScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node,
+                                      Stack<LogicalNode> stack) throws TajoException {
+      context.scanlist.add(node);
+      return super.visitIndexScan(context, plan, block, node, stack);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index bf41d5b..463d015 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -199,7 +199,7 @@ public class GlobalPlanner {
   }
 
   private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
-                                        ExecutionBlock leftBlock, ExecutionBlock rightBlock) {
+                                        ExecutionBlock leftBlock, ExecutionBlock rightBlock) throws TajoException {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
@@ -596,7 +596,7 @@ public class GlobalPlanner {
   }
 
   private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
-                                      GroupbyNode groupbyNode) {
+                                      GroupbyNode groupbyNode) throws TajoException {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
@@ -675,7 +675,7 @@ public class GlobalPlanner {
   }
 
   private ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
-                                                  GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) {
+                                                  GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) throws TajoException {
     DataChannel lastDataChannel = null;
 
     // It pushes down the first phase group-by operator into all child blocks.
@@ -715,7 +715,7 @@ public class GlobalPlanner {
   }
 
   private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
-                                                     GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+                                                     GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) throws TajoException {
 
     ExecutionBlock childBlock = latestBlock;
     childBlock.setPlan(firstPhaseGroupby);
@@ -780,7 +780,7 @@ public class GlobalPlanner {
     return firstPhaseGroupBy;
   }
 
-  private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) {
+  private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) throws TajoException {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
@@ -860,7 +860,7 @@ public class GlobalPlanner {
    */
   private ExecutionBlock buildShuffleAndStorePlanNoPartitionedTableWithUnion(GlobalPlanContext context,
                                                                              StoreTableNode currentNode,
-                                                                             ExecutionBlock childBlock) {
+                                                                             ExecutionBlock childBlock) throws TajoException {
     for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlock)) {
       StoreTableNode copy = PlannerUtil.clone(context.plan.getLogicalPlan(), currentNode);
       copy.setChild(grandChildBlock.getPlan());
@@ -875,7 +875,7 @@ public class GlobalPlanner {
    */
   private ExecutionBlock buildShuffleAndStorePlanToPartitionedTableWithUnion(GlobalPlanContext context,
                                                                              StoreTableNode currentNode,
-                                                                             ExecutionBlock lastBlock) {
+                                                                             ExecutionBlock lastBlock) throws TajoException {
 
     MasterPlan masterPlan = context.plan;
     DataChannel lastChannel = null;
@@ -899,7 +899,7 @@ public class GlobalPlanner {
    */
   private ExecutionBlock buildShuffleAndStorePlanToPartitionedTable(GlobalPlanContext context,
                                                                     StoreTableNode currentNode,
-                                                                    ExecutionBlock lastBlock) {
+                                                                    ExecutionBlock lastBlock) throws TajoException {
     MasterPlan masterPlan = context.plan;
 
     ExecutionBlock nextBlock = masterPlan.newExecutionBlock();
@@ -920,7 +920,7 @@ public class GlobalPlanner {
 
   private ExecutionBlock buildNoPartitionedStorePlan(GlobalPlanContext context,
                                                      StoreTableNode currentNode,
-                                                     ExecutionBlock childBlock) {
+                                                     ExecutionBlock childBlock) throws TajoException {
     if (hasUnionChild(currentNode)) { // when the below is union
       return buildShuffleAndStorePlanNoPartitionedTableWithUnion(context, currentNode, childBlock);
     } else {
@@ -1240,7 +1240,8 @@ public class GlobalPlanner {
       return node;
     }
 
-    private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) {
+    private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node)
+        throws TajoException {
       ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
       execBlock.setPlan(node);
       context.execBlockMap.put(node.getPID(), execBlock);

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index feaba76..a7b03e7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -23,8 +23,10 @@ package org.apache.tajo.engine.planner.global;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.annotation.NotNull;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -47,6 +49,44 @@ public class MasterPlan {
   private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph =
           new SimpleDirectedGraph<>();
 
+  private Map<ExecutionBlockId, ShuffleContext> shuffleInfo = new HashMap<>();
+
+  /**
+   *
+   */
+  public class ShuffleContext {
+    ExecutionBlockId parentEbId;
+    int partitionNum;
+
+    public ShuffleContext(ExecutionBlockId parentEbId, int partitionNum) {
+      this.parentEbId = parentEbId;
+      this.partitionNum = partitionNum;
+    }
+
+    public ExecutionBlockId getParentEbId() {
+      return parentEbId;
+    }
+
+    public int getPartitionNum() {
+      return partitionNum;
+    }
+  }
+
+  /**
+   *
+   * @param ebId
+   * @param partitionNum
+   */
+  public void addShuffleInfo(ExecutionBlockId ebId, int partitionNum) {
+    ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId();
+    shuffleInfo.put(parentId, new ShuffleContext(ebId, partitionNum));
+  }
+
+  public Optional<ShuffleContext> getShuffleInfo(ExecutionBlockId ebId) {
+    ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId();
+    return shuffleInfo.containsKey(parentId) ? Optional.of(shuffleInfo.get(parentId)) : Optional.empty();
+  }
+
   public ExecutionBlockId newExecutionBlockId() {
     return new ExecutionBlockId(queryId, nextId.incrementAndGet());
   }
@@ -215,14 +255,14 @@ public class MasterPlan {
     return getChild(executionBlock.getId(), idx);
   }
 
-  public <CONTEXT> void accept(CONTEXT context, ExecutionBlockId v, DirectedGraphVisitor<CONTEXT, ExecutionBlockId> visitor) {
+  public <CONTEXT> void accept(CONTEXT context, ExecutionBlockId v, DirectedGraphVisitor<CONTEXT,
+      ExecutionBlockId> visitor) throws TajoException {
     execBlockGraph.accept(context, v, visitor);
   }
 
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    ExecutionBlockCursor cursor = new ExecutionBlockCursor(this);
     sb.append("-------------------------------------------------------------------------------\n");
     sb.append("Execution Block Graph (TERMINAL - " + getTerminalBlock() + ")\n");
     sb.append("-------------------------------------------------------------------------------\n");
@@ -285,12 +325,7 @@ public class MasterPlan {
         sb.append("\n[Enforcers]\n");
         int i = 0;
         List<EnforceProperty> enforceProperties = block.getEnforcer().getProperties();
-        Collections.sort(enforceProperties, new Comparator<EnforceProperty>() {
-          @Override
-          public int compare(EnforceProperty o1, EnforceProperty o2) {
-            return o1.toString().compareTo(o2.toString());
-          }
-        });
+        Collections.sort(enforceProperties, (e1, e2) -> e1.toString().compareTo(e2.toString()));
         for (EnforceProperty enforce : enforceProperties) {
           sb.append(" ").append(i++).append(": ");
           sb.append(Enforcer.toString(enforce));

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 592ea2b..8f7673b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -30,6 +30,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
@@ -711,7 +712,7 @@ public class DistinctGroupbyBuilder {
 
   private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
                                                          DistinctGroupbyNode firstPhaseGroupBy,
-                                                         DistinctGroupbyNode secondPhaseGroupBy) {
+                                                         DistinctGroupbyNode secondPhaseGroupBy) throws TajoException {
     DataChannel lastDataChannel = null;
 
     // It pushes down the first phase group-by operator into all child blocks.

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
index a19704b..d390740 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -175,7 +175,8 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
     }
 
     @Override
-    public void visit(Context context, Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId) {
+    public void visit(Context context, Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId)
+        throws TajoException {
       ExecutionBlock current = plan.getExecBlock(executionBlockId);
 
       if (plan.isLeaf(current)) {
@@ -209,7 +210,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
      *
      * @param current
      */
-    private void visitNonLeafNode(Context context, ExecutionBlock current) {
+    private void visitNonLeafNode(Context context, ExecutionBlock current) throws TajoException {
       // At non-leaf execution blocks, merge broadcastable children's plan with the current plan.
 
       if (!plan.isTerminal(current)) {
@@ -423,7 +424,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
      * @param parent parent block who has join nodes
      * @return
      */
-    private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) {
+    private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) throws TajoException {
       ScanNode scanForChild = findScanForChildEb(child, parent);
 
       parentFinder.set(scanForChild);
@@ -446,7 +447,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
     }
 
     private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> unionScanMap, MasterPlan plan,
-                                         ExecutionBlock child, ExecutionBlock current) {
+                                         ExecutionBlock child, ExecutionBlock current) throws TajoException {
       if (unionScanMap != null) {
         List<ExecutionBlockId> unionScans = new ArrayList<>();
         ExecutionBlockId representativeId = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 0e89928..ff629c3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -85,7 +85,7 @@ public class ExternalSortExec extends SortExec {
   /** the defaultFanout of external sort */
   private final int defaultFanout;
   /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */
-  private int sortBufferBytesNum;
+  private final long sortBufferBytesNum;
   /** the number of available cores */
   private final int allocatedCoreNum;
   /** If there are available multiple cores, it tries parallel merge. */

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index ec7ed2d..4e4251a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -50,7 +50,6 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Task.PullHost;
 import org.apache.tajo.storage.*;
@@ -321,12 +320,12 @@ public class Repartitioner {
             if (tbNameToInterm.containsKey(scanEbId)) {
               tbNameToInterm.get(scanEbId).add(intermediateEntry);
             } else {
-              tbNameToInterm.put(scanEbId, new ArrayList<>(Arrays.asList(intermediateEntry)));
+              tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry));
             }
           } else {
             Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
                     new HashMap<>();
-            tbNameToInterm.put(scanEbId, new ArrayList<>(Arrays.asList(intermediateEntry)));
+            tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry));
             hashEntries.put(intermediateEntry.getPartId(), tbNameToInterm);
           }
         }
@@ -606,10 +605,9 @@ public class Repartitioner {
                                                       MasterPlan masterPlan, Stage stage, int maxNum)
       throws IOException {
     DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0);
-    if (channel.getShuffleType() == HASH_SHUFFLE
-        || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
+    if (channel.isHashShuffle()) {
       scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
-    } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
+    } else if (channel.isRangeShuffle()) {
       scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
     } else {
       throw new TajoInternalError("Cannot support partition type");
@@ -698,10 +696,8 @@ public class Repartitioner {
 
       TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
       if (LOG.isDebugEnabled()) {
-        if (ranges != null) {
-          for (TupleRange eachRange : ranges) {
-            LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
-          }
+        for (TupleRange eachRange : ranges) {
+          LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
         }
       }
     }
@@ -999,7 +995,7 @@ public class Repartitioner {
         int partId = eachInterm.getPartId();
         List<IntermediateEntry> partitionInterms = partitionIntermMap.get(partId);
         if (partitionInterms == null) {
-          partitionInterms = Arrays.asList(eachInterm);
+          partitionInterms = Lists.newArrayList(eachInterm);
           partitionIntermMap.put(partId, partitionInterms);
         } else {
           partitionInterms.add(eachInterm);
@@ -1078,7 +1074,7 @@ public class Repartitioner {
           fetchListVolume = 0;
         }
         FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
-            ebId, currentInterm.getPartId(), Arrays.asList(currentInterm));
+            ebId, currentInterm.getPartId(), Lists.newArrayList(currentInterm));
         fetch.setOffset(eachSplit.getFirst());
         fetch.setLength(eachSplit.getSecond());
         fetchListForSingleTask.add(fetch.getProto());
@@ -1219,7 +1215,7 @@ public class Repartitioner {
       if (hashed.containsKey(entry.getPartId())) {
         hashed.get(entry.getPartId()).add(entry);
       } else {
-        hashed.put(entry.getPartId(), Arrays.asList(entry));
+        hashed.put(entry.getPartId(), Lists.newArrayList(entry));
       }
     }
 
@@ -1235,7 +1231,7 @@ public class Repartitioner {
       if (hashed.containsKey(host)) {
         hashed.get(host).add(entry);
       } else {
-        hashed.put(host, Arrays.asList(entry));
+        hashed.put(host, Lists.newArrayList(entry));
       }
     }
 
@@ -1258,12 +1254,12 @@ public class Repartitioner {
     }
 
     // set the partition number for group by and sort
-    if (channel.getShuffleType() == HASH_SHUFFLE) {
+    if (channel.isHashShuffle()) {
       if (execBlock.getPlan().getType() == NodeType.GROUP_BY ||
           execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) {
         keys = channel.getShuffleKeys();
       }
-    } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
+    } else if (channel.isRangeShuffle()) {
       if (execBlock.getPlan().getType() == NodeType.SORT) {
         SortNode sort = (SortNode) execBlock.getPlan();
         keys = new Column[sort.getSortKeys().length];
@@ -1278,6 +1274,7 @@ public class Repartitioner {
         channel.setShuffleOutputNum(1);
       } else {
         channel.setShuffleKeys(keys);
+        // NOTE: desiredNum is not used in Sort anymore.
         channel.setShuffleOutputNum(desiredNum);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index f1813c9..08ff184 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -42,14 +42,17 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.global.MasterPlan.ShuffleContext;
 import org.apache.tajo.error.Errors.SerializedException;
 import org.apache.tajo.exception.ErrorUtil;
 import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
@@ -855,6 +858,7 @@ public class Stage implements EventHandler<StageEvent> {
           ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
           DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
           setShuffleIfNecessary(stage, channel);
+          // TODO: verify changed shuffle plan
           initTaskScheduler(stage);
           // execute pre-processing asyncronously
           stage.getContext().getQueryMasterContext().getSingleEventExecutor()
@@ -920,8 +924,8 @@ public class Stage implements EventHandler<StageEvent> {
      * methods and the number of partitions to a given Stage.
      */
     private static void setShuffleIfNecessary(Stage stage, DataChannel channel) {
-      if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
-        int numTasks = calculateShuffleOutputNum(stage, channel);
+      if (channel.isHashShuffle()) {
+        int numTasks = calculateShuffleOutputNum(stage);
         Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel);
       }
     }
@@ -933,122 +937,154 @@ public class Stage implements EventHandler<StageEvent> {
      * @param stage
      * @return
      */
-    public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) {
+    public static int calculateShuffleOutputNum(Stage stage) {
       MasterPlan masterPlan = stage.getMasterPlan();
-      ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
 
-      LogicalNode grpNode = null;
-      if (parent != null) {
-        grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
-        if (grpNode == null) {
-          grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
+      // For test
+      if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
+        int partitionNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
+        LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum + " for test");
+        return partitionNum;
+      }
+
+      Optional<ShuffleContext> optional = masterPlan.getShuffleInfo(stage.getId());
+      if (optional.isPresent()) {
+        LOG.info("# of partitions is determined as " + optional.get().getPartitionNum() +
+            "to match with sibling eb's partition number");
+        return optional.get().getPartitionNum();
+
+      } else {
+        ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
+        int partitionNum;
+
+        if (parent != null) {
+          // We assume this execution block the first stage of join if two or more tables are included in this block,
+          if (parent.hasJoin()) {
+            if (parent.getNonBroadcastRelNum() > 1) {
+              // repartition join
+              partitionNum = calculatePartitionNumForRepartitionJoin(parent, stage);
+              LOG.info(stage.getId() + ", The determined number of partitions for repartition join is " + partitionNum);
+            } else {
+              // broadcast join
+              // partition number is calculated using the volume of the large table
+              partitionNum = calculatePartitionNumDefault(parent, stage);
+              LOG.info(stage.getId() + ", The determined number of partitions for broadcast join is " + partitionNum);
+            }
+
+          } else {
+            // Is this stage the first step of group-by?
+            if (parent.hasAgg()) {
+              LogicalNode grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY,
+                  NodeType.DISTINCT_GROUP_BY, NodeType.WINDOW_AGG);
+              if (grpNode == null) {
+                throw new TajoInternalError("Cannot find aggregation plan for " + stage.getId());
+              }
+
+              if (!hasGroupKeys(stage, grpNode)) {
+                LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
+                partitionNum = 1;
+              } else {
+                partitionNum = calculatePartitionNumForAgg(parent, stage);
+                LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + partitionNum);
+              }
+
+            } else {
+              // NOTE: the below code might be executed during sort, but the partition number is not used anymore for sort.
+              LOG.info("============>>>>> Unexpected Case! <<<<<================");
+              partitionNum = calculatePartitionNumDefault(parent, stage);
+              LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum);
+            }
+
+          }
+        } else {
+          // This case means that the parent eb does not exist even though data shuffle is required after the current eb.
+          throw new TajoInternalError("Cannot find parent execution block of " + stage.block.getId());
         }
+
+        // Record the partition number for sibling execution blocks
+        masterPlan.addShuffleInfo(stage.getId(), partitionNum);
+        return partitionNum;
       }
+    }
+
+    private static int calculatePartitionNumForRepartitionJoin(ExecutionBlock parent, Stage currentStage) {
+      List<ExecutionBlock> childs = currentStage.masterPlan.getChilds(parent);
 
-      // We assume this execution block the first stage of join if two or more tables are included in this block,
-      if (parent != null && (parent.getNonBroadcastRelNum()) >= 2) {
-        List<ExecutionBlock> childs = masterPlan.getChilds(parent);
+      // for outer
+      ExecutionBlock outer = childs.get(0);
+      long outerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, outer);
 
-        // for outer
-        ExecutionBlock outer = childs.get(0);
-        long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer);
+      // for inner
+      ExecutionBlock inner = childs.get(1);
+      long innerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, inner);
+      LOG.info(currentStage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+          + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
 
-        // for inner
-        ExecutionBlock inner = childs.get(1);
-        long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner);
-        LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
-            + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
+      long bigger = Math.max(outerVolume, innerVolume);
 
-        long bigger = Math.max(outerVolume, innerVolume);
+      int mb = (int) Math.ceil((double) bigger / 1048576);
+      LOG.info(currentStage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
 
-        int mb = (int) Math.ceil((double) bigger / 1048576);
-        LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
+      return (int) Math.ceil((double) mb /
+          currentStage.masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
+    }
 
-        int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
+    private static int calculatePartitionNumForAgg(ExecutionBlock parent, Stage stage) {
+      int volumeByMB = getInputVolumeMB(parent, stage);
+      LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
+      // determine the number of task
+      return (int) Math.ceil((double) volumeByMB /
+          stage.masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
 
-        if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
-          taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
-          LOG.warn("!!!!! TESTCASE MODE !!!!!");
-        }
+    }
 
-        // The shuffle output numbers of join may be inconsistent by execution block order.
-        // Thus, we need to compare the number with DataChannel output numbers.
-        // If the number is right, the number and DataChannel output numbers will be consistent.
-        int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0;
-        for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
-          outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum());
-        }
-        for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
-          innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+    private static boolean hasGroupKeys(Stage currentStage, LogicalNode aggNode) {
+      if (aggNode.getType() == NodeType.GROUP_BY) {
+        return ((GroupbyNode)aggNode).getGroupingColumns().length > 0;
+      } else if (aggNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+        // Find current distinct stage node.
+        DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(currentStage.getBlock().getPlan(),
+            NodeType.DISTINCT_GROUP_BY);
+        if (distinctNode == null) {
+          LOG.warn(currentStage.getId() + ", Can't find current DistinctGroupbyNode");
+          distinctNode = (DistinctGroupbyNode)aggNode;
         }
-        if (outerShuffleOutputNum != innerShuffleOutputNum
-            && taskNum != outerShuffleOutputNum
-            && taskNum != innerShuffleOutputNum) {
-          LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" +
-                  ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) +
-                  ", outerShuffleOutptNum=" + outerShuffleOutputNum +
-                  ", innerShuffleOutputNum=" + innerShuffleOutputNum);
-          taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum);
-        }
-
-        LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum);
-
-        return taskNum;
-        // Is this stage the first step of group-by?
-      } else if (grpNode != null) {
-        boolean hasGroupColumns = true;
-        if (grpNode.getType() == NodeType.GROUP_BY) {
-          hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
-        } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
-          // Find current distinct stage node.
-          DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
-          if (distinctNode == null) {
-            LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
-            distinctNode = (DistinctGroupbyNode)grpNode;
-          }
-          hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
+        boolean hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
 
-          Enforcer enforcer = stage.getBlock().getEnforcer();
-          if (enforcer == null) {
-            LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null.");
-          }
+        Enforcer enforcer = currentStage.getBlock().getEnforcer();
+        if (enforcer == null) {
+          LOG.warn(currentStage.getId() + ", DistinctGroupbyNode's enforcer is null.");
+        } else {
           EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
           if (property != null) {
             if (property.getDistinct().getIsMultipleAggregation()) {
               MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage();
-              if (multiAggStage != MultipleAggregationStage.THRID_STAGE) {
-                hasGroupColumns = true;
-              }
+              hasGroupColumns = multiAggStage != MultipleAggregationStage.THRID_STAGE;
             }
           }
         }
-        if (!hasGroupColumns) {
-          LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
-          return 1;
-        } else {
-          long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
-
-          int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB);
-          LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
-          // determine the number of task
-          int taskNum = (int) Math.ceil((double) volumeByMB /
-              masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
-          LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum);
-          return taskNum;
-        }
+        return hasGroupColumns;
       } else {
-        LOG.info("============>>>>> Unexpected Case! <<<<<================");
-        long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
-
-        int mb = (int) Math.ceil((double)volume / 1048576);
-        LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
-        // determine the number of task per 128MB
-        int taskNum = (int) Math.ceil((double)mb / 128);
-        LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum);
-        return taskNum;
+        return ((WindowAggNode) aggNode).hasPartitionKeys();
       }
     }
 
+    private static int calculatePartitionNumDefault(ExecutionBlock parent, Stage currentStage) {
+      int mb = getInputVolumeMB(parent, currentStage);
+      LOG.info(currentStage.getId() + ", Table's volume is approximately " + mb + " MB");
+      // determine the number of task per 128 MB
+      return (int) Math.ceil((double)mb / 128);
+    }
+
+    private static int getInputVolumeMB(ExecutionBlock parent, Stage currentStage) {
+      // NOTE: Get input volume from the parent EB.
+      // If the parent EB contains an UNION query, the volume of the whole input for the UNION is returned.
+      // Otherwise, only the input volume of the current EB is returned.
+      long volume = getInputVolume(currentStage.masterPlan, currentStage.context, parent);
+
+      return (int) Math.ceil((double)volume / StorageUnit.MB);
+    }
+
     private static void schedule(Stage stage) throws IOException, TajoException {
       MasterPlan masterPlan = stage.getMasterPlan();
       ExecutionBlock execBlock = stage.getBlock();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
index 48affc5..59adfc5 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
@@ -398,7 +398,7 @@ public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
      * @param columns a set of columns
      * @return schema build from columns
      */
-    private Schema buildSchemaFromColumnSet(Set<Column> columns) {
+    private Schema buildSchemaFromColumnSet(Set<Column> columns) throws TajoException {
       SchemaGraph schemaGraph = new SchemaGraph();
       Set<ColumnVertex> rootVertexes = new HashSet<>();
       Schema schema = new Schema();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 239becc..6897e17 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -414,7 +414,7 @@ public class PlannerUtil {
    * @param type to find
    * @return a found logical node
    */
-  public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType type) {
+  public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType... type) {
     Preconditions.checkNotNull(node);
     Preconditions.checkNotNull(type);