You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/26 08:26:28 UTC

git commit: TAJO-205: Repartitioner occasionally chooses a partition number as one. (hyunsik)

Updated Branches:
  refs/heads/master 17287ef58 -> 9ca979223


TAJO-205: Repartitioner occasionally chooses a partition number as one. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/9ca97922
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/9ca97922
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/9ca97922

Branch: refs/heads/master
Commit: 9ca97922386ef2954016efd54e94fd81c2f31f91
Parents: 17287ef
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Sep 26 15:26:06 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Sep 26 15:26:06 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |  11 +-
 .../apache/tajo/engine/planner/InsertNode.java  |   2 +-
 .../engine/planner/PhysicalPlannerImpl.java     |   8 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |  27 +-
 .../tajo/engine/planner/global/MasterPlan.java  |   4 +
 .../engine/planner/physical/BNLJoinExec.java    |   3 +-
 .../planner/physical/ExternalSortExec.java      |   8 +-
 .../org/apache/tajo/master/GlobalPlanner.java   | 387 +++++++++++--------
 .../tajo/master/querymaster/Repartitioner.java  |  22 +-
 .../tajo/master/querymaster/SubQuery.java       |  15 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   3 +-
 .../src/main/resources/tajo-default.xml         |   5 +
 .../tajo/engine/query/TestTableSubQuery.java    |  21 +-
 .../src/test/resources/tajo-default.xml         |   2 +-
 15 files changed, 305 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9400154..c6e88de 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -153,6 +153,9 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-205: Repartitioner occasionally chooses a partition number as one.
+    (hyunsik)
+
     TAJO-191: INSERT OVERWRITE INTO statement should follow the table meta in 
     catalog. (jinho)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 855e096..f429a26 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -107,19 +107,20 @@ public class TajoConf extends YarnConfiguration {
     //////////////////////////////////
     // Physical Executors
     //////////////////////////////////
-    EXT_SORT_BUFFER("tajo.extsort.buffer", 400000),
+    EXTENAL_SORT_BUFFER_NUM("tajo.sort.external.buffer", 1000000),
     BROADCAST_JOIN_THRESHOLD("tajo.join.broadcast.threshold", (long)5 * 1048576),
-    HASH_AGGREGATION_THRESHOLD("tajo.aggregation.hash.threshold", (long)512 * 1048576),
+    INMEMORY_HASH_TABLE_DEFAULT_SIZE("tajo.join.inmemory.table.num", (long)1000000),
+    INMEMORY_HASH_JOIN_THRESHOLD("tajo.join.memhash, threshold", (long)256 * 1048576),
+    INMEMORY_HASH_AGGREGATION_THRESHOLD("tajo.aggregation.hash.threshold", (long)256 * 1048576),
 
     //////////////////////////////////////////
     // Distributed Query Execution Parameters
     //////////////////////////////////////////
-    SORT_BUFFER_SIZE("tajo.sort.mb", 128),
     JOIN_TASK_VOLUME("tajo.join.task-volume.mb", 128),
-    SORT_TASK_VOLUME("tajo.sort.task-volume.mb", 256),
+    SORT_TASK_VOLUME("tajo.sort.task-volume.mb", 128),
     AGGREGATION_TASK_VOLUME("tajo.task-aggregation.volume.mb", 128),
 
-    JOIN_PARTITION_VOLUME("tajo.join.part-volume.mb", 64),
+    JOIN_PARTITION_VOLUME("tajo.join.part-volume.mb", 128),
     SORT_PARTITION_VOLUME("tajo.sort.part-volume.mb", 256),
     AGGREGATION_PARTITION_VOLUME("tajo.aggregation.part-volume.mb", 256),
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index ae88630..4a3e479 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -160,7 +160,7 @@ public class InsertNode extends LogicalNode implements Cloneable {
     sb.append("INTO");
 
     if (hasTargetTable()) {
-      sb.append(targetTableDesc);
+      sb.append(targetTableDesc.getName());
     }
 
     if (hasPath()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index ddedd80..c1a1b2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -87,8 +87,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     DataChannel channel = context.getDataChannel();
     StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
     storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
-    storeTableNode.setInSchema(execPlan.getSchema());
-    storeTableNode.setOutSchema(execPlan.getSchema());
+    storeTableNode.setInSchema(plan.getOutSchema());
+    storeTableNode.setOutSchema(plan.getOutSchema());
     if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
       storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
     } else {
@@ -281,7 +281,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     long leftSize = estimateSizeRecursive(context, leftLineage);
     long rightSize = estimateSizeRecursive(context, rightLineage);
 
-    final long threshold = 1048576 * 128; // 64MB
+    final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
 
     boolean hashJoin = false;
     if (leftSize < threshold || rightSize < threshold) {
@@ -412,7 +412,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
-    final long threshold = conf.getLongVar(TajoConf.ConfVars.HASH_AGGREGATION_THRESHOLD);
+    final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_AGGREGATION_THRESHOLD);
 
     // if the relation size is less than the threshold,
     // the hash aggregation will be used.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 4907c40..60243bc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -452,16 +452,31 @@ public class PlannerUtil {
     return new SortSpec[][] {outerSortSpec, innerSortSpec};
   }
 
-  public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
-    SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, outer, inner);
+  public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+    SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, leftSchema, rightSchema);
     TupleComparator [] comparators = new TupleComparator[2];
-    comparators[0] = new TupleComparator(outer, sortSpecs[0]);
-    comparators[1] = new TupleComparator(inner, sortSpecs[1]);
+    comparators[0] = new TupleComparator(leftSchema, sortSpecs[0]);
+    comparators[1] = new TupleComparator(rightSchema, sortSpecs[1]);
     return comparators;
   }
 
-  public static List<Column []> getJoinKeyPairs(EvalNode joinQual, Schema outer, Schema inner) {
-    JoinKeyPairFinder finder = new JoinKeyPairFinder(outer, inner);
+  /**
+   * @return the first array contains left table's columns, and the second array contains right table's columns.
+   */
+  public static Column [][] joinJoinKeyForEachTable(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+    List<Column []> joinKeys = getJoinKeyPairs(joinQual, leftSchema, rightSchema);
+    Column [] leftColumns = new Column[joinKeys.size()];
+    Column [] rightColumns = new Column[joinKeys.size()];
+    for (int i = 0; i < joinKeys.size(); i++) {
+      leftColumns[i] = joinKeys.get(i)[0];
+      rightColumns[i] = joinKeys.get(i)[1];
+    }
+
+    return new Column[][] {leftColumns, rightColumns};
+  }
+
+  public static List<Column []> getJoinKeyPairs(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+    JoinKeyPairFinder finder = new JoinKeyPairFinder(leftSchema, rightSchema);
     joinQual.preOrder(finder);
     return finder.getPairs();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 2f4001e..6d30d00 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -170,6 +170,10 @@ public class MasterPlan {
     execBlockGraph.disconnect(src, target);
   }
 
+  public ExecutionBlock getParent(ExecutionBlock executionBlock) {
+    return execBlockMap.get(execBlockGraph.getParent(executionBlock.getId()));
+  }
+
   public List<ExecutionBlock> getChilds(ExecutionBlock execBlock) {
     return getChilds(execBlock.getId());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index ba01b52..21f8749 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -60,8 +60,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
   public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
                      final PhysicalExec outer, PhysicalExec inner) {
-    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
-        SchemaUtil.merge(outer.getSchema(), inner.getSchema()), outer, inner);
+    super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), plan.getOutSchema(), outer, inner);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
     if (joinQual != null) { // if join type is not 'cross join'

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 8fcd527..4742136 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -42,7 +42,7 @@ public class ExternalSortExec extends SortExec {
 
   private final TableMeta meta;
   private final Path sortTmpDir;
-  private int SORT_BUFFER_SIZE;
+  private int MEM_TUPLE_NUM;
 
   public ExternalSortExec(final TaskAttemptContext context,
       final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
@@ -50,8 +50,8 @@ public class ExternalSortExec extends SortExec {
     super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
     this.plan = plan;
 
-    this.SORT_BUFFER_SIZE = context.getConf().getIntVar(ConfVars.EXT_SORT_BUFFER);
-    this.tupleSlots = new ArrayList<Tuple>(SORT_BUFFER_SIZE);
+    this.MEM_TUPLE_NUM = context.getConf().getIntVar(ConfVars.EXTENAL_SORT_BUFFER_NUM);
+    this.tupleSlots = new ArrayList<Tuple>(MEM_TUPLE_NUM);
 
     this.sortTmpDir = new Path(context.getWorkDir(), UUID.randomUUID().toString());
     this.localFS = FileSystem.getLocal(context.getConf());
@@ -96,7 +96,7 @@ public class ExternalSortExec extends SortExec {
     Tuple tuple;
     while ((tuple = child.next()) != null) { // partition sort start
       tupleSlots.add(new VTuple(tuple));
-      if (tupleSlots.size() == SORT_BUFFER_SIZE) {
+      if (tupleSlots.size() == MEM_TUPLE_NUM) {
         sortAndStoreChunk(chunkId, tupleSlots);
         chunkId++;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 620e813..37c77b7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.DataChannel;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.*;
@@ -53,7 +54,6 @@ public class GlobalPlanner {
     LogicalNode topmost;
     LogicalNode lastRepartionableNode;
     ExecutionBlock topMostLeftExecBlock;
-    ExecutionBlock topMostRightExecBlock;
   }
 
   /**
@@ -104,6 +104,29 @@ public class GlobalPlanner {
     LOG.info(masterPlan);
   }
 
+  private ExecutionBlock buildRepartitionBlocks(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
+                                                LogicalNode childNode, ExecutionBlock lastChildBlock)
+      throws PlanningException {
+
+    ExecutionBlock currentBlock = null;
+    ExecutionBlock childBlock;
+    childBlock = lastChildBlock;
+
+    NodeType shuffleRequiredNodeType = lastDistNode.getType();
+    if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
+      ExecutionBlock [] blocks = buildGroupBy(masterPlan, lastDistNode, curNode, childNode, childBlock);
+      currentBlock = blocks[0];
+    } else if (shuffleRequiredNodeType == NodeType.SORT) {
+      ExecutionBlock [] blocks = buildSortPlan(masterPlan, lastDistNode, curNode, childNode, childBlock);
+      currentBlock = blocks[0];
+    } else if (shuffleRequiredNodeType == NodeType.JOIN) {
+      ExecutionBlock [] blocks = buildJoinPlan(masterPlan, lastDistNode, childBlock, lastChildBlock);
+      currentBlock = blocks[0];
+    }
+
+    return currentBlock;
+  }
+  
   public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
     Preconditions.checkArgument(channel.getSchema() != null,
         "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -112,6 +135,193 @@ public class GlobalPlanner {
     return new ScanNode(plan.newPID(), desc);
   }
 
+  private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
+                                                ExecutionBlock parent, JoinNode join, boolean leftTable) {
+    ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
+
+    DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+    if (join.getJoinType() != JoinType.CROSS) {
+      Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
+          leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
+      if (leftTable) {
+        channel.setPartitionKey(joinColumns[0]);
+      } else {
+        channel.setPartitionKey(joinColumns[1]);
+      }
+    }
+    return channel;
+  }
+
+  private ExecutionBlock [] buildJoinPlan(MasterPlan masterPlan, LogicalNode lastDistNode,
+                                          ExecutionBlock childBlock, ExecutionBlock lastChildBlock)
+      throws PlanningException {
+    ExecutionBlock currentBlock;
+
+    JoinNode joinNode = (JoinNode) lastDistNode;
+    LogicalNode leftNode = joinNode.getLeftChild();
+    LogicalNode rightNode = joinNode.getRightChild();
+
+    ExecutionBlock leftBlock;
+    if (lastChildBlock == null) {
+      leftBlock = masterPlan.newExecutionBlock();
+      leftBlock.setPlan(leftNode);
+    } else {
+      leftBlock = lastChildBlock;
+    }
+    ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
+    rightBlock.setPlan(rightNode);
+
+    currentBlock = masterPlan.newExecutionBlock();
+
+    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
+    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+
+    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
+
+    joinNode.setLeftChild(leftScan);
+    joinNode.setRightChild(rightScan);
+    currentBlock.setPlan(joinNode);
+
+    masterPlan.addConnect(leftChannel);
+    masterPlan.addConnect(rightChannel);
+
+    return new ExecutionBlock[] { currentBlock, childBlock };
+  }
+
+  private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
+                                         LogicalNode childNode, ExecutionBlock childBlock) throws PlanningException {
+    ExecutionBlock currentBlock = null;
+    GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
+
+    GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+    firstGroupBy.setHavingCondition(null);
+
+    if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+        ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+      UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+      ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+      UnionsFinderContext finderContext = new UnionsFinderContext();
+      finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+      currentBlock = masterPlan.newExecutionBlock();
+      GroupbyNode secondGroupBy = groupByNode;
+      for (UnionNode union : finderContext.unionList) {
+        TableSubQueryNode leftSubQuery = union.getLeftChild();
+        TableSubQueryNode rightSubQuery = union.getRightChild();
+        DataChannel dataChannel;
+        if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+          GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+          g1.setChild(leftSubQuery);
+          execBlock.setPlan(g1);
+          dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+          secondGroupBy.setChild(scanNode);
+          masterPlan.addConnect(dataChannel);
+        }
+        if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+          GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+          g1.setChild(rightSubQuery);
+          execBlock.setPlan(g1);
+          dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+          secondGroupBy.setChild(scanNode);
+          masterPlan.addConnect(dataChannel);
+        }
+      }
+      LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+      if (parent instanceof UnaryNode && parent != secondGroupBy) {
+        ((UnaryNode)parent).setChild(secondGroupBy);
+      }
+      currentBlock.setPlan(currentNode);
+    } else {
+
+      if (childBlock == null) { // first repartition node
+        childBlock = masterPlan.newExecutionBlock();
+      }
+      childBlock.setPlan(firstGroupBy);
+
+      currentBlock = masterPlan.newExecutionBlock();
+
+      DataChannel channel;
+      if (firstGroupBy.isEmptyGrouping()) {
+        channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+        channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+      } else {
+        channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+        channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+      }
+      channel.setSchema(firstGroupBy.getOutSchema());
+
+      GroupbyNode secondGroupBy = groupByNode;
+      ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+      secondGroupBy.setChild(scanNode);
+
+      LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+      if (parent instanceof UnaryNode && parent != secondGroupBy) {
+        ((UnaryNode)parent).setChild(secondGroupBy);
+      }
+
+      masterPlan.addConnect(channel);
+      currentBlock.setPlan(currentNode);
+    }
+
+    return new ExecutionBlock [] {currentBlock, childBlock};
+  }
+
+  private ExecutionBlock [] buildSortPlan(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
+                                          LogicalNode childNode, ExecutionBlock childBlock) {
+    ExecutionBlock currentBlock = null;
+
+    SortNode firstSort = (SortNode) lastDistNode;
+    if (childBlock == null) {
+      childBlock = masterPlan.newExecutionBlock();
+    }
+    childBlock.setPlan(firstSort);
+
+    currentBlock = masterPlan.newExecutionBlock();
+    DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
+    channel.setSchema(childNode.getOutSchema());
+
+    SortNode secondSort = PlannerUtil.clone(lastDistNode);
+    ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+    secondSort.setChild(secondScan);
+
+    LimitNode limitAndSort;
+    LimitNode limitOrNull = PlannerUtil.findTopNode(currentNode, NodeType.LIMIT);
+    if (limitOrNull != null) {
+      limitAndSort = PlannerUtil.clone(limitOrNull);
+      limitAndSort.setChild(firstSort);
+
+      if (childBlock.getPlan().getType() == NodeType.SORT) {
+        childBlock.setPlan(limitAndSort);
+      } else {
+        LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
+        if (sortParent != null) {
+          if (sortParent instanceof UnaryNode) {
+            ((UnaryNode)sortParent).setChild(limitAndSort);
+          }
+        }
+      }
+    }
+
+    LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+    if (parent instanceof UnaryNode && parent != secondSort) {
+      ((UnaryNode)parent).setChild(secondSort);
+    }
+
+    masterPlan.addConnect(channel);
+    currentBlock.setPlan(currentNode);
+
+    return new ExecutionBlock[] { currentBlock, childBlock };
+  }
+
   public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext> {
 
     @Override
@@ -120,7 +330,7 @@ public class GlobalPlanner {
       super.visitRoot(plan, node, stack, context);
 
       if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
-        context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
       } else if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() == NodeType.UNION) {
 
       } else {
@@ -142,174 +352,13 @@ public class GlobalPlanner {
     }
 
     @Override
-    public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, GlobalPlanContext context) throws PlanningException {
+    public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, GlobalPlanContext context)
+        throws PlanningException {
       super.visitLimit(plan, node, stack, context);
       context.topmost = node;
       return node;
     }
 
-    private ExecutionBlock addChannel(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
-                                      LogicalNode childNode, ExecutionBlock lastChildBlock) throws PlanningException {
-      ExecutionBlock currentBlock = null;
-      ExecutionBlock childBlock;
-
-      childBlock = lastChildBlock;
-
-      NodeType shuffleRequiredNodeType = lastDistNode.getType();
-      if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
-        GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
-
-        GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
-        firstGroupBy.setHavingCondition(null);
-
-        if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
-            ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
-
-          UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
-          ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
-          UnionsFinderContext finderContext = new UnionsFinderContext();
-          finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
-
-          currentBlock = masterPlan.newExecutionBlock();
-          GroupbyNode secondGroupBy = groupByNode;
-          for (UnionNode union : finderContext.unionList) {
-            TableSubQueryNode leftSubQuery = union.getLeftChild();
-            TableSubQueryNode rightSubQuery = union.getRightChild();
-            DataChannel dataChannel;
-            if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
-              ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-              GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-              g1.setChild(leftSubQuery);
-              execBlock.setPlan(g1);
-              dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
-              ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-              secondGroupBy.setChild(scanNode);
-              masterPlan.addConnect(dataChannel);
-            }
-            if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
-              ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-              GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-              g1.setChild(rightSubQuery);
-              execBlock.setPlan(g1);
-              dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
-              ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-              secondGroupBy.setChild(scanNode);
-              masterPlan.addConnect(dataChannel);
-            }
-          }
-          LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
-          if (parent instanceof UnaryNode && parent != secondGroupBy) {
-            ((UnaryNode)parent).setChild(secondGroupBy);
-          }
-          currentBlock.setPlan(curNode);
-        } else {
-
-          if (childBlock == null) { // first repartition node
-            childBlock = masterPlan.newExecutionBlock();
-          }
-          childBlock.setPlan(firstGroupBy);
-
-          currentBlock = masterPlan.newExecutionBlock();
-
-          DataChannel channel;
-          if (firstGroupBy.isEmptyGrouping()) {
-            channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
-            channel.setPartitionKey(firstGroupBy.getGroupingColumns());
-          } else {
-            channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-            channel.setPartitionKey(firstGroupBy.getGroupingColumns());
-          }
-          channel.setSchema(firstGroupBy.getOutSchema());
-
-          GroupbyNode secondGroupBy = groupByNode;
-          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-          secondGroupBy.setChild(scanNode);
-
-          LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
-          if (parent instanceof UnaryNode && parent != secondGroupBy) {
-            ((UnaryNode)parent).setChild(secondGroupBy);
-          }
-
-          masterPlan.addConnect(channel);
-          currentBlock.setPlan(curNode);
-        }
-      } else if (shuffleRequiredNodeType == NodeType.SORT) {
-        SortNode firstSort = (SortNode) lastDistNode;
-        if (childBlock == null) {
-          childBlock = masterPlan.newExecutionBlock();
-        }
-        childBlock.setPlan(firstSort);
-
-        currentBlock = masterPlan.newExecutionBlock();
-        DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
-        channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
-        channel.setSchema(childNode.getOutSchema());
-
-        SortNode secondSort = PlannerUtil.clone(lastDistNode);
-        ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-        secondSort.setChild(secondScan);
-
-        LimitNode limitAndSort;
-        LimitNode limitOrNull = PlannerUtil.findTopNode(curNode, NodeType.LIMIT);
-        if (limitOrNull != null) {
-          limitAndSort = PlannerUtil.clone(limitOrNull);
-          limitAndSort.setChild(firstSort);
-
-          if (childBlock.getPlan().getType() == NodeType.SORT) {
-            childBlock.setPlan(limitAndSort);
-          } else {
-            LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
-            if (sortParent != null) {
-              if (sortParent instanceof UnaryNode) {
-                ((UnaryNode)sortParent).setChild(limitAndSort);
-              }
-            }
-          }
-        }
-
-        LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
-        if (parent instanceof UnaryNode && parent != secondSort) {
-          ((UnaryNode)parent).setChild(secondSort);
-        }
-
-        masterPlan.addConnect(channel);
-        currentBlock.setPlan(curNode);
-      } else if (shuffleRequiredNodeType == NodeType.JOIN) {
-        JoinNode joinNode = (JoinNode) lastDistNode;
-        LogicalNode leftNode = joinNode.getLeftChild();
-        LogicalNode rightNode = joinNode.getRightChild();
-
-        ExecutionBlock leftBlock;
-        if (lastChildBlock == null) {
-          leftBlock = masterPlan.newExecutionBlock();
-          leftBlock.setPlan(leftNode);
-        } else {
-          leftBlock = lastChildBlock;
-        }
-        ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
-        rightBlock.setPlan(rightNode);
-
-        currentBlock = masterPlan.newExecutionBlock();
-
-        DataChannel leftChannel = new DataChannel(leftBlock, currentBlock, HASH_PARTITION, 32);
-        DataChannel rightChannel = new DataChannel(rightBlock, currentBlock, HASH_PARTITION, 32);
-
-        ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
-        ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
-
-        joinNode.setLeftChild(leftScan);
-        joinNode.setRightChild(rightScan);
-        currentBlock.setPlan(joinNode);
-
-        masterPlan.addConnect(leftChannel);
-        masterPlan.addConnect(rightChannel);
-      }
-
-      return currentBlock;
-    }
-
     @Override
     public LogicalNode visitSort(LogicalPlan plan, SortNode node, Stack<LogicalNode> stack, GlobalPlanContext context)
         throws PlanningException {
@@ -317,7 +366,7 @@ public class GlobalPlanner {
       super.visitSort(plan, node, stack, context);
 
       if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
             context.topMostLeftExecBlock);
       }
 
@@ -333,7 +382,7 @@ public class GlobalPlanner {
       super.visitGroupBy(plan, node, stack, context);
 
       if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
             context.topMostLeftExecBlock);
       }
 
@@ -356,7 +405,7 @@ public class GlobalPlanner {
       super.visitJoin(plan, node, stack, context);
 
       if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
             context.topMostLeftExecBlock);
       }
 
@@ -372,7 +421,7 @@ public class GlobalPlanner {
       super.visitUnion(plan, node, stack, context);
 
       if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
-        context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
             context.topMostLeftExecBlock);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 78c9e3e..7911925 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -594,22 +594,18 @@ public class Repartitioner {
     return hashed;
   }
 
-  public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n, DataChannel channel) {
+  public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int desiredNum, DataChannel channel) {
     ExecutionBlock execBlock = subQuery.getBlock();
     Column[] keys = null;
     // if the next query is join,
     // set the partition number for the current logicalUnit
     // TODO: the union handling is required when a join has unions as its child
     MasterPlan masterPlan = subQuery.getMasterPlan();
-    ExecutionBlock parentBlock = execBlock.getParentBlock();
-    if (parentBlock != null) {
-      if (parentBlock.getStoreTableNode().getChild().getType() == NodeType.JOIN) {
-        execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
-            execBlock.getStoreTableNode().getPartitionKeys(), n);
-        keys = execBlock.getStoreTableNode().getPartitionKeys();
-
-        masterPlan.getOutgoingChannels(subQuery.getId()).iterator().next()
-            .setPartition(execBlock.getPartitionType(), execBlock.getStoreTableNode().getPartitionKeys(), n);
+    keys = channel.getPartitionKey();
+    if (!masterPlan.isRoot(subQuery.getBlock()) ) {
+      ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
+      if (parentBlock.getPlan().getType() == NodeType.JOIN) {
+        channel.setPartitionNum(desiredNum);
       }
     }
 
@@ -631,9 +627,11 @@ public class Repartitioner {
     }
     if (keys != null) {
       if (keys.length == 0) {
-        channel.setPartition(execBlock.getPartitionType(), new Column[]{}, 1);
+        channel.setPartitionKey(new Column[] {});
+        channel.setPartitionNum(1);
       } else {
-        channel.setPartition(execBlock.getPartitionType(), keys, n);
+        channel.setPartitionKey(keys);
+        channel.setPartitionNum(desiredNum);
       }
     }
     return subQuery;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index a45d38c..254fd7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -65,6 +65,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
 
 
 /**
@@ -422,7 +423,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           subQuery.finish();
           state = SubQueryState.SUCCEEDED;
         } else {
-          DataChannel channel = subQuery.getMasterPlan().getOutgoingChannels(subQuery.getId()).get(0);
+          ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
+          DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
           setRepartitionIfNecessary(subQuery, channel);
           createTasks(subQuery);
 
@@ -458,7 +460,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
      * methods and the number of partitions to a given subquery.
      */
     private static void setRepartitionIfNecessary(SubQuery subQuery, DataChannel channel) {
-      if (subQuery.getBlock().hasParentBlock()) {
+      if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
         int numTasks = calculatePartitionNum(subQuery, channel);
         Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks, channel);
       }
@@ -473,7 +475,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
      */
     public static int calculatePartitionNum(SubQuery subQuery, DataChannel channel) {
       TajoConf conf = subQuery.context.getConf();
-      ExecutionBlock parent = subQuery.getBlock().getParentBlock();
+      MasterPlan masterPlan = subQuery.getMasterPlan();
+      ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock());
 
       GroupbyNode grpNode = null;
       if (parent != null) {
@@ -482,14 +485,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
       // Is this subquery the first step of join?
       if (parent != null && parent.getScanNodes().length == 2) {
-        Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
+        List<ExecutionBlock> childs = masterPlan.getChilds(parent);
 
         // for inner
-        ExecutionBlock outer = child.next();
+        ExecutionBlock outer = childs.get(0);
         long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer);
 
         // for inner
-        ExecutionBlock inner = child.next();
+        ExecutionBlock inner = childs.get(1);
         long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
         LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
         LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index e70f780..ba5f342 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -133,13 +133,14 @@ enum TransmitType {
 message DataChannelProto {
   required ExecutionBlockIdProto srcId = 1;
   required ExecutionBlockIdProto targetId = 2;
+
   required TransmitType transmitType = 3 [default = PULL_TRANSMIT];
   required PartitionType partitionType = 4;
 
   optional SchemaProto schema = 5;
 
   repeated ColumnProto partitionKey = 7;
-  optional int32 partitionNum = 8 [default = 1];
+  optional int32 partitionNum = 9 [default = 1];
 
   optional StoreType storeType = 10 [default = CSV];
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index a4e39ff..bf52919 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -42,6 +42,11 @@
   </property>
 
   <property>
+    <name>tajo.master.manager.addr</name>
+    <value>127.0.0.1:9004</value>
+  </property>
+
+  <property>
     <name>tajo.query.session.timeout</name>
     <value>60000</value>
     <description>ms</description>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
index 1635398..db886ba 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
@@ -18,19 +18,17 @@
 
 package org.apache.tajo.engine.query;
 
-import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.client.ResultSetUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.*;
 
-@Category(IntegrationTest.class)
 public class TestTableSubQuery {
   private static TpchTestBase tpch;
   public TestTableSubQuery() throws IOException {
@@ -78,6 +76,19 @@ public class TestTableSubQuery {
         "FROM\n" +
         "(SELECT * FROM nation WHERE n_name LIKE 'A%') A " +
         "JOIN region B ON A.n_regionkey=B.r_regionkey");
-    System.out.println(ResultSetUtil.prettyFormat(res));
+
+    Map<String,String> expected = new HashMap<String, String>();
+    expected.put("ARGENTINA", "AMERICA");
+    expected.put("ALGERIA", "AFRICA");
+    try {
+      assertNotNull(res);
+      assertTrue(res.next());
+      assertTrue(expected.get(res.getString("n_name")).equals(res.getString("r_name")));
+      assertTrue(res.next());
+      assertTrue(expected.get(res.getString("n_name")).equals(res.getString("r_name")));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
index 7012cd9..a7a7f64 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
@@ -48,7 +48,7 @@
 
   <property>
     <name>tajo.master.manager.addr</name>
-    <value>127.0.0.1</value>
+    <value>127.0.0.1:0</value>
   </property>
 
 </configuration>