You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/16 13:33:29 UTC

[2/7] TAJO-184: Refactor GlobalPlanner and global plan data structure. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 7f95a85..b8e2fc3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -145,14 +145,14 @@ public class TestLogicalPlanner {
     testJsonSerDerObject(root);
 
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projNode = (ProjectionNode) root.getChild();
+    ProjectionNode projNode = root.getChild();
 
     assertEquals(NodeType.SELECTION, projNode.getChild().getType());
-    SelectionNode selNode = (SelectionNode) projNode.getChild();
+    SelectionNode selNode = projNode.getChild();
 
     assertEquals(NodeType.SCAN, selNode.getChild().getType());
-    ScanNode scanNode = (ScanNode) selNode.getChild();
-    assertEquals("employee", scanNode.getTableId());
+    ScanNode scanNode = selNode.getChild();
+    assertEquals("employee", scanNode.getTableName());
   }
 
   public static void assertSchema(Schema expected, Schema schema) {
@@ -190,22 +190,17 @@ public class TestLogicalPlanner {
     }
 
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projNode = (ProjectionNode) root.getChild();
+    ProjectionNode projNode = root.getChild();
 
     assertEquals(NodeType.JOIN, projNode.getChild().getType());
-    JoinNode joinNode = (JoinNode) projNode.getChild();
+    JoinNode joinNode = projNode.getChild();
 
     assertEquals(NodeType.SCAN, joinNode.getLeftChild().getType());
-    ScanNode leftNode = (ScanNode) joinNode.getLeftChild();
-    assertEquals("employee", leftNode.getTableId());
+    ScanNode leftNode = joinNode.getLeftChild();
+    assertEquals("employee", leftNode.getTableName());
     assertEquals(NodeType.SCAN, joinNode.getRightChild().getType());
-    ScanNode rightNode = (ScanNode) joinNode.getRightChild();
-    assertEquals("dept", rightNode.getTableId());
-    /*
-    LogicalNode optimized = LogicalOptimizer.optimize(expr, plan);
-    assertSchema(expectedSchema, optimized.getOutSchema());
-    */
-
+    ScanNode rightNode = joinNode.getRightChild();
+    assertEquals("dept", rightNode.getTableName());
 
     // three relations
     expr = sqlAnalyzer.parse(QUERIES[2]);
@@ -220,30 +215,27 @@ public class TestLogicalPlanner {
     root = (LogicalRootNode) plan;
 
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    projNode = (ProjectionNode) root.getChild();
+    projNode = root.getChild();
 
     assertEquals(NodeType.JOIN, projNode.getChild().getType());
-    joinNode = (JoinNode) projNode.getChild();
+    joinNode = projNode.getChild();
 
     assertEquals(NodeType.JOIN, joinNode.getLeftChild().getType());
 
     assertEquals(NodeType.SCAN, joinNode.getRightChild().getType());
-    ScanNode scan1 = (ScanNode) joinNode.getRightChild();
-    assertEquals("score", scan1.getTableId());
+    ScanNode scan1 = joinNode.getRightChild();
+    assertEquals("score", scan1.getTableName());
 
-    JoinNode leftNode2 = (JoinNode) joinNode.getLeftChild();
+    JoinNode leftNode2 = joinNode.getLeftChild();
     assertEquals(NodeType.JOIN, leftNode2.getType());
 
     assertEquals(NodeType.SCAN, leftNode2.getLeftChild().getType());
-    ScanNode leftScan = (ScanNode) leftNode2.getLeftChild();
-    assertEquals("employee", leftScan.getTableId());
+    ScanNode leftScan = leftNode2.getLeftChild();
+    assertEquals("employee", leftScan.getTableName());
 
     assertEquals(NodeType.SCAN, leftNode2.getRightChild().getType());
-    ScanNode rightScan = (ScanNode) leftNode2.getRightChild();
-    assertEquals("dept", rightScan.getTableId());
-    /*
-    optimized = LogicalOptimizer.optimize(expr, plan);
-    assertSchema(expectedSchema, optimized.getOutSchema());*/
+    ScanNode rightScan = leftNode2.getRightChild();
+    assertEquals("dept", rightScan.getTableName());
   }
 
 
@@ -273,29 +265,24 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     LogicalRootNode root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode proj = (ProjectionNode) root.getChild();
+    ProjectionNode proj = root.getChild();
     assertEquals(NodeType.JOIN, proj.getChild().getType());
-    JoinNode join = (JoinNode) proj.getChild();
+    JoinNode join = proj.getChild();
     assertEquals(JoinType.INNER, join.getJoinType());
     assertEquals(NodeType.SCAN, join.getRightChild().getType());
     assertTrue(join.hasJoinQual());
-    ScanNode scan = (ScanNode) join.getRightChild();
-    assertEquals("score", scan.getTableId());
+    ScanNode scan = join.getRightChild();
+    assertEquals("score", scan.getTableName());
 
     assertEquals(NodeType.JOIN, join.getLeftChild().getType());
-    join = (JoinNode) join.getLeftChild();
+    join = join.getLeftChild();
     assertEquals(JoinType.INNER, join.getJoinType());
     assertEquals(NodeType.SCAN, join.getLeftChild().getType());
-    ScanNode outer = (ScanNode) join.getLeftChild();
-    assertEquals("employee", outer.getTableId());
+    ScanNode outer = join.getLeftChild();
+    assertEquals("employee", outer.getTableName());
     assertEquals(NodeType.SCAN, join.getRightChild().getType());
-    ScanNode inner = (ScanNode) join.getRightChild();
-    assertEquals("dept", inner.getTableId());
-
-    /*
-    LogicalNode optimized = LogicalOptimizer.optimize(context, plan);
-    assertSchema(expectedJoinSchema, optimized.getOutSchema());
-    */
+    ScanNode inner = join.getRightChild();
+    assertEquals("dept", inner.getTableName());
   }
 
   @Test
@@ -309,29 +296,25 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     LogicalRootNode root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode proj = (ProjectionNode) root.getChild();
+    ProjectionNode proj = root.getChild();
     assertEquals(NodeType.JOIN, proj.getChild().getType());
-    JoinNode join = (JoinNode) proj.getChild();
+    JoinNode join = proj.getChild();
     assertEquals(JoinType.INNER, join.getJoinType());
     assertEquals(NodeType.SCAN, join.getRightChild().getType());
-    ScanNode scan = (ScanNode) join.getRightChild();
-    assertEquals("score", scan.getTableId());
+    ScanNode scan = join.getRightChild();
+    assertEquals("score", scan.getTableName());
 
     assertEquals(NodeType.JOIN, join.getLeftChild().getType());
-    join = (JoinNode) join.getLeftChild();
+    join = join.getLeftChild();
     assertEquals(JoinType.INNER, join.getJoinType());
     assertEquals(NodeType.SCAN, join.getLeftChild().getType());
-    ScanNode outer = (ScanNode) join.getLeftChild();
-    assertEquals("employee", outer.getTableId());
+    ScanNode outer = join.getLeftChild();
+    assertEquals("employee", outer.getTableName());
     assertEquals(NodeType.SCAN, join.getRightChild().getType());
-    ScanNode inner = (ScanNode) join.getRightChild();
-    assertEquals("dept", inner.getTableId());
+    ScanNode inner = join.getRightChild();
+    assertEquals("dept", inner.getTableName());
     assertTrue(join.hasJoinQual());
     assertEquals(EvalType.EQUAL, join.getJoinQual().getType());
-    /*
-    LogicalNode optimized = LogicalOptimizer.optimize(expr, plan);
-    assertSchema(expectedJoinSchema, optimized.getOutSchema());
-    */
   }
 
   @Test
@@ -345,30 +328,25 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     LogicalRootNode root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode proj = (ProjectionNode) root.getChild();
+    ProjectionNode proj = root.getChild();
     assertEquals(NodeType.JOIN, proj.getChild().getType());
-    JoinNode join = (JoinNode) proj.getChild();
+    JoinNode join = proj.getChild();
     assertEquals(JoinType.RIGHT_OUTER, join.getJoinType());
     assertEquals(NodeType.SCAN, join.getRightChild().getType());
-    ScanNode scan = (ScanNode) join.getRightChild();
-    assertEquals("score", scan.getTableId());
+    ScanNode scan = join.getRightChild();
+    assertEquals("score", scan.getTableName());
 
     assertEquals(NodeType.JOIN, join.getLeftChild().getType());
-    join = (JoinNode) join.getLeftChild();
+    join = join.getLeftChild();
     assertEquals(JoinType.LEFT_OUTER, join.getJoinType());
     assertEquals(NodeType.SCAN, join.getLeftChild().getType());
-    ScanNode outer = (ScanNode) join.getLeftChild();
-    assertEquals("employee", outer.getTableId());
+    ScanNode outer = join.getLeftChild();
+    assertEquals("employee", outer.getTableName());
     assertEquals(NodeType.SCAN, join.getRightChild().getType());
-    ScanNode inner = (ScanNode) join.getRightChild();
-    assertEquals("dept", inner.getTableId());
+    ScanNode inner = join.getRightChild();
+    assertEquals("dept", inner.getTableName());
     assertTrue(join.hasJoinQual());
     assertEquals(EvalType.EQUAL, join.getJoinQual().getType());
-
-    /*
-    LogicalNode optimized = LogicalOptimizer.optimize(context, plan);
-    assertSchema(expectedJoinSchema, optimized.getOutSchema());
-    */
   }
 
 
@@ -392,19 +370,19 @@ public class TestLogicalPlanner {
     root = (LogicalRootNode) plan;
 
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projNode = (ProjectionNode) root.getChild();
+    ProjectionNode projNode = root.getChild();
     assertEquals(NodeType.GROUP_BY, projNode.getChild().getType());
-    GroupbyNode groupByNode = (GroupbyNode) projNode.getChild();
+    GroupbyNode groupByNode =  projNode.getChild();
 
     assertEquals(NodeType.JOIN, groupByNode.getChild().getType());
-    JoinNode joinNode = (JoinNode) groupByNode.getChild();
+    JoinNode joinNode = groupByNode.getChild();
 
     assertEquals(NodeType.SCAN, joinNode.getLeftChild().getType());
-    ScanNode leftNode = (ScanNode) joinNode.getLeftChild();
-    assertEquals("dept", leftNode.getTableId());
+    ScanNode leftNode = joinNode.getLeftChild();
+    assertEquals("dept", leftNode.getTableName());
     assertEquals(NodeType.SCAN, joinNode.getRightChild().getType());
-    ScanNode rightNode = (ScanNode) joinNode.getRightChild();
-    assertEquals("score", rightNode.getTableId());
+    ScanNode rightNode = joinNode.getRightChild();
+    assertEquals("score", rightNode.getTableName());
 
     //LogicalOptimizer.optimize(context, plan);
   }
@@ -418,9 +396,6 @@ public class TestLogicalPlanner {
     testJsonSerDerObject(plan);
     Schema expected = tpch.getOutSchema("q2");
     assertSchema(expected, plan.getOutSchema());
-//    LogicalNode optimized = LogicalOptimizer.optimize(context, plan);
-//    System.out.println(optimized);
-//    assertSchema(expected, optimized.getOutSchema());
   }
 
 
@@ -428,17 +403,17 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.PROJECTION, plan.getType());
     ProjectionNode projNode = (ProjectionNode) plan;
     assertEquals(NodeType.GROUP_BY, projNode.getChild().getType());
-    GroupbyNode groupByNode = (GroupbyNode) projNode.getChild();
+    GroupbyNode groupByNode = projNode.getChild();
 
     assertEquals(NodeType.JOIN, groupByNode.getChild().getType());
-    JoinNode joinNode = (JoinNode) groupByNode.getChild();
+    JoinNode joinNode = groupByNode.getChild();
 
     assertEquals(NodeType.SCAN, joinNode.getLeftChild().getType());
-    ScanNode leftNode = (ScanNode) joinNode.getLeftChild();
-    assertEquals("dept", leftNode.getTableId());
+    ScanNode leftNode = joinNode.getLeftChild();
+    assertEquals("dept", leftNode.getTableName());
     assertEquals(NodeType.SCAN, joinNode.getRightChild().getType());
-    ScanNode rightNode = (ScanNode) joinNode.getRightChild();
-    assertEquals("score", rightNode.getTableId());
+    ScanNode rightNode = joinNode.getRightChild();
+    assertEquals("score", rightNode.getTableName());
   }
 
 
@@ -453,9 +428,8 @@ public class TestLogicalPlanner {
     LogicalRootNode root = (LogicalRootNode) plan;
 
     assertEquals(NodeType.STORE, root.getChild().getType());
-    StoreTableNode storeNode = (StoreTableNode) root.getChild();
+    StoreTableNode storeNode = root.getChild();
     testQuery7(storeNode.getChild());
-    //LogicalOptimizer.optimize(context, plan);
   }
 
   @Test
@@ -469,20 +443,20 @@ public class TestLogicalPlanner {
     LogicalRootNode root = (LogicalRootNode) plan;
 
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projNode = (ProjectionNode) root.getChild();
+    ProjectionNode projNode = root.getChild();
 
     assertEquals(NodeType.SORT, projNode.getChild().getType());
-    SortNode sortNode = (SortNode) projNode.getChild();
+    SortNode sortNode = projNode.getChild();
 
     assertEquals(NodeType.JOIN, sortNode.getChild().getType());
-    JoinNode joinNode = (JoinNode) sortNode.getChild();
+    JoinNode joinNode = sortNode.getChild();
 
     assertEquals(NodeType.SCAN, joinNode.getLeftChild().getType());
-    ScanNode leftNode = (ScanNode) joinNode.getLeftChild();
-    assertEquals("dept", leftNode.getTableId());
+    ScanNode leftNode = joinNode.getLeftChild();
+    assertEquals("dept", leftNode.getTableName());
     assertEquals(NodeType.SCAN, joinNode.getRightChild().getType());
-    ScanNode rightNode = (ScanNode) joinNode.getRightChild();
-    assertEquals("score", rightNode.getTableId());
+    ScanNode rightNode = joinNode.getRightChild();
+    assertEquals("score", rightNode.getTableName());
   }
 
   @Test
@@ -496,10 +470,10 @@ public class TestLogicalPlanner {
     LogicalRootNode root = (LogicalRootNode) plan;
 
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projNode = (ProjectionNode) root.getChild();
+    ProjectionNode projNode = root.getChild();
 
     assertEquals(NodeType.LIMIT, projNode.getChild().getType());
-    LimitNode limitNode = (LimitNode) projNode.getChild();
+    LimitNode limitNode = projNode.getChild();
 
     assertEquals(NodeType.SORT, limitNode.getChild().getType());
   }
@@ -514,21 +488,12 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     LogicalRootNode root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projNode = (ProjectionNode) root.getChild();
+    ProjectionNode projNode = root.getChild();
     assertEquals(NodeType.SELECTION, projNode.getChild().getType());
-    SelectionNode selNode = (SelectionNode) projNode.getChild();
+    SelectionNode selNode = projNode.getChild();
     assertEquals(NodeType.SCAN, selNode.getChild().getType());
-    ScanNode scanNode = (ScanNode) selNode.getChild();
-    assertEquals(scanNode.getTableId(), "employee");
-
-    /*
-    LogicalNode optimized = LogicalOptimizer.optimize(expr, plan);
-    assertEquals(NodeType.ROOT, optimized.getType());
-    root = (LogicalRootNode) optimized;
-
-    assertEquals(NodeType.SCAN, root.getSubNode().getType());
-    scanNode = (ScanNode) root.getSubNode();
-    assertEquals("employee", scanNode.getTableId());*/
+    ScanNode scanNode = selNode.getChild();
+    assertEquals(scanNode.getTableName(), "employee");
   }
 
 
@@ -625,7 +590,6 @@ public class TestLogicalPlanner {
 
     expr = sqlAnalyzer.parse(ALIAS[1]);
     plan = planner.createPlan(expr).getRootBlock().getRoot();
-//    plan = LogicalOptimizer.optimize(expr, plan);
     root = (LogicalRootNode) plan;
 
     finalSchema = root.getOutSchema();
@@ -662,7 +626,7 @@ public class TestLogicalPlanner {
     LogicalRootNode root = (LogicalRootNode) plan;
     testJsonSerDerObject(root);
     assertEquals(NodeType.CREATE_TABLE, root.getChild().getType());
-    CreateTableNode createTable = (CreateTableNode) root.getChild();
+    CreateTableNode createTable = root.getChild();
 
     Schema def = createTable.getSchema();
     assertEquals("name", def.getColumn(0).getColumnName());
@@ -749,26 +713,26 @@ public class TestLogicalPlanner {
 
     LogicalRootNode root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projNode = (ProjectionNode) root.getChild();
+    ProjectionNode projNode = root.getChild();
     assertEquals(NodeType.UNION, projNode.getChild().getType());
-    UnionNode u0 = (UnionNode) projNode.getChild();
+    UnionNode u0 = projNode.getChild();
     assertEquals(NodeType.GROUP_BY, u0.getLeftChild().getType());
     assertEquals(NodeType.UNION, u0.getRightChild().getType());
-    GroupbyNode grp = (GroupbyNode) u0.getLeftChild();
+    GroupbyNode grp = u0.getLeftChild();
     cuboids.add(Sets.newHashSet(grp.getGroupingColumns()));
 
-    UnionNode u1 = (UnionNode) u0.getRightChild();
+    UnionNode u1 = u0.getRightChild();
     assertEquals(NodeType.GROUP_BY, u1.getLeftChild().getType());
     assertEquals(NodeType.UNION, u1.getRightChild().getType());
-    grp = (GroupbyNode) u1.getLeftChild();
+    grp = u1.getLeftChild();
     cuboids.add(Sets.newHashSet(grp.getGroupingColumns()));
 
-    UnionNode u2 = (UnionNode) u1.getRightChild();
+    UnionNode u2 = u1.getRightChild();
     assertEquals(NodeType.GROUP_BY, u2.getLeftChild().getType());
-    grp = (GroupbyNode) u2.getRightChild();
+    grp = u2.getRightChild();
     cuboids.add(Sets.newHashSet(grp.getGroupingColumns()));
     assertEquals(NodeType.GROUP_BY, u2.getRightChild().getType());
-    grp = (GroupbyNode) u2.getLeftChild();
+    grp = u2.getLeftChild();
     cuboids.add(Sets.newHashSet(grp.getGroupingColumns()));
 
     assertEquals((int)Math.pow(2, 2), cuboids.size());
@@ -791,50 +755,48 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     LogicalRootNode root = (LogicalRootNode) plan;
     assertEquals(NodeType.UNION, root.getChild().getType());
-    UnionNode union = (UnionNode) root.getChild();
-    assertEquals(NodeType.PROJECTION, union.getLeftChild().getType());
-    ProjectionNode projL = (ProjectionNode) union.getLeftChild();
-    assertEquals(NodeType.SELECTION, projL.getChild().getType());
-    assertEquals(NodeType.PROJECTION, union.getRightChild().getType());
-    ProjectionNode projR = (ProjectionNode) union.getRightChild();
-    assertEquals(NodeType.SELECTION, projR.getChild().getType());
+    UnionNode union = root.getChild();
+    assertEquals(NodeType.TABLE_SUBQUERY, union.getLeftChild().getType());
+    TableSubQueryNode leftSub = union.getLeftChild();
+    assertEquals(NodeType.PROJECTION, leftSub.getSubQuery().getType());
+    assertEquals(NodeType.TABLE_SUBQUERY, union.getRightChild().getType());
+    TableSubQueryNode rightSub = union.getRightChild();
+    assertEquals(NodeType.PROJECTION, rightSub.getSubQuery().getType());
   }
 
   @Test
   public final void testSetPlan2() throws PlanningException {
     // for testing multiple set statements
     Expr expr = sqlAnalyzer.parse(setStatements[1]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
-    testJsonSerDerObject(plan);
-    assertEquals(NodeType.ROOT, plan.getType());
-    LogicalRootNode root = (LogicalRootNode) plan;
+    LogicalPlan plan = planner.createPlan(expr);
+    System.out.println(plan);
+    LogicalRootNode root = plan.getRootBlock().getRoot();
+    testJsonSerDerObject(root);
+    assertEquals(NodeType.ROOT, root.getType());
     assertEquals(NodeType.UNION, root.getChild().getType());
-    UnionNode union = (UnionNode) root.getChild();
-    assertEquals(NodeType.PROJECTION, union.getLeftChild().getType());
-    assertEquals(NodeType.INTERSECT, union.getRightChild().getType());
-    IntersectNode intersect = (IntersectNode) union.getRightChild();
-    assertEquals(NodeType.PROJECTION, intersect.getLeftChild().getType());
-    assertEquals(NodeType.PROJECTION, intersect.getRightChild().getType());
+    UnionNode union = root.getChild();
+    assertEquals(NodeType.TABLE_SUBQUERY, union.getLeftChild().getType());
+    assertEquals(NodeType.TABLE_SUBQUERY, union.getRightChild().getType());
+    TableSubQueryNode subQuery = union.getRightChild();
+    assertEquals(NodeType.INTERSECT, subQuery.getSubQuery().getType());
   }
 
   @Test
   public final void testSetPlan3() throws PlanningException {
     // for testing multiple set statements
     Expr expr = sqlAnalyzer.parse(setStatements[2]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
-    testJsonSerDerObject(plan);
-    assertEquals(NodeType.ROOT, plan.getType());
-    LogicalRootNode root = (LogicalRootNode) plan;
+    LogicalPlan plan = planner.createPlan(expr);
+    LogicalRootNode root = plan.getRootBlock().getRoot();
+    testJsonSerDerObject(root);
+    assertEquals(NodeType.ROOT, root.getType());
     assertEquals(NodeType.EXCEPT, root.getChild().getType());
-    ExceptNode except = (ExceptNode) root.getChild();
-    assertEquals(NodeType.UNION, except.getLeftChild().getType());
-    assertEquals(NodeType.INTERSECT, except.getRightChild().getType());
-    UnionNode union = (UnionNode) except.getLeftChild();
-    assertEquals(NodeType.PROJECTION, union.getLeftChild().getType());
-    assertEquals(NodeType.PROJECTION, union.getRightChild().getType());
-    IntersectNode intersect = (IntersectNode) except.getRightChild();
-    assertEquals(NodeType.PROJECTION, intersect.getLeftChild().getType());
-    assertEquals(NodeType.PROJECTION, intersect.getRightChild().getType());
+    ExceptNode except = root.getChild();
+    assertEquals(NodeType.TABLE_SUBQUERY, except.getLeftChild().getType());
+    assertEquals(NodeType.TABLE_SUBQUERY, except.getRightChild().getType());
+    TableSubQueryNode leftSubQuery = except.getLeftChild();
+    TableSubQueryNode rightSubQuery = except.getRightChild();
+    assertEquals(NodeType.UNION, leftSubQuery.getSubQuery().getType());
+    assertEquals(NodeType.INTERSECT, rightSubQuery.getSubQuery().getType());
   }
 
   static final String [] setQualifiers = {
@@ -851,7 +813,7 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     LogicalRootNode root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    ProjectionNode projectionNode = (ProjectionNode) root.getChild();
+    ProjectionNode projectionNode = root.getChild();
     assertEquals(NodeType.SCAN, projectionNode.getChild().getType());
 
     context = sqlAnalyzer.parse(setQualifiers[1]);
@@ -860,7 +822,7 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    projectionNode = (ProjectionNode) root.getChild();
+    projectionNode = root.getChild();
     assertEquals(NodeType.GROUP_BY, projectionNode.getChild().getType());
 
     context = sqlAnalyzer.parse(setQualifiers[2]);
@@ -868,7 +830,7 @@ public class TestLogicalPlanner {
     testJsonSerDerObject(plan);
     root = (LogicalRootNode) plan;
     assertEquals(NodeType.PROJECTION, root.getChild().getType());
-    projectionNode = (ProjectionNode) root.getChild();
+    projectionNode = root.getChild();
     assertEquals(NodeType.SCAN, projectionNode.getChild().getType());
   }
 
@@ -962,8 +924,8 @@ public class TestLogicalPlanner {
   }
 
   private static InsertNode getInsertNode(LogicalPlan plan) {
-    LogicalRootNode root = (LogicalRootNode) plan.getRootBlock().getRoot();
+    LogicalRootNode root = plan.getRootBlock().getRoot();
     assertEquals(NodeType.INSERT, root.getChild().getType());
-    return (InsertNode) root.getChild();
+    return root.getChild();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 4821751..46fd648 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -110,27 +110,6 @@ public class TestPlannerUtil {
     System.out.println(root);
   }
   
-  @Test
-  public final void testTrasformTwoPhaseWithStore() throws PlanningException {
-    Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[9]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
-    
-    assertEquals(NodeType.ROOT, plan.getType());
-    UnaryNode unary = (UnaryNode) plan;
-    assertEquals(NodeType.PROJECTION, unary.getChild().getType());
-    ProjectionNode proj = (ProjectionNode) unary.getChild();
-    assertEquals(NodeType.GROUP_BY, proj.getChild().getType());
-    GroupbyNode groupby = (GroupbyNode) proj.getChild();
-    unary = (UnaryNode) PlannerUtil.transformGroupbyTo2PWithStore(
-        groupby, "test");
-    assertEquals(NodeType.STORE, unary.getChild().getType());
-    unary = (UnaryNode) unary.getChild();
-    
-    assertEquals(groupby.getInSchema(), unary.getOutSchema());
-    
-    assertEquals(NodeType.GROUP_BY, unary.getChild().getType());
-  }
-  
   private final class TwoPhaseBuilder implements LogicalNodeVisitor {
     @Override
     public void visit(LogicalNode node) {
@@ -158,10 +137,10 @@ public class TestPlannerUtil {
 
     assertEquals(NodeType.SCAN, joinNode.getLeftChild().getType());
     ScanNode leftNode = (ScanNode) joinNode.getLeftChild();
-    assertEquals("employee", leftNode.getTableId());
+    assertEquals("employee", leftNode.getTableName());
     assertEquals(NodeType.SCAN, joinNode.getRightChild().getType());
     ScanNode rightNode = (ScanNode) joinNode.getRightChild();
-    assertEquals("dept", rightNode.getTableId());
+    assertEquals("dept", rightNode.getTableName());
     
     LogicalNode node = PlannerUtil.findTopNode(root, NodeType.ROOT);
     assertEquals(NodeType.ROOT, node.getType());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
deleted file mode 100644
index 7572ad5..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.global;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.TestEvalTree.TestSum;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.GlobalPlanner;
-import org.apache.tajo.storage.*;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestGlobalQueryOptimizer {
-  private static TajoTestingCluster util;
-  private static TajoConf conf;
-  private static CatalogService catalog;
-  private static GlobalPlanner planner;
-  private static Schema schema;
-  private static SQLAnalyzer analyzer;
-  private static LogicalPlanner logicalPlanner;
-  private static LogicalOptimizer logicalOptimizer;
-  private static QueryId queryId;
-  private static GlobalOptimizer optimizer;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    util = new TajoTestingCluster();
-    util.startCatalogCluster();
-    int i, j;
-
-    schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
-    schema.addColumn("salary", Type.INT4);
-
-    TableMeta meta;
-
-    conf = new TajoConf(util.getConfiguration());
-    catalog = util.getMiniCatalogCluster().getCatalog();
-    AbstractStorageManager sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
-    FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4));
-    catalog.registerFunction(funcDesc);
-    FileSystem fs = sm.getFileSystem();
-
-    AsyncDispatcher dispatcher = new AsyncDispatcher();
-
-    planner = new GlobalPlanner(conf, sm,
-        dispatcher.getEventHandler());
-    analyzer = new SQLAnalyzer();
-    logicalPlanner = new LogicalPlanner(catalog);
-    logicalOptimizer = new LogicalOptimizer();
-
-    int tbNum = 2;
-    int tupleNum;
-    Appender appender;
-    Tuple t = new VTuple(4);
-    t.put(new Datum[] {
-        DatumFactory.createInt4(1), DatumFactory.createInt4(32),
-        DatumFactory.createText("h"), DatumFactory.createInt4(10)});
-
-    for (i = 0; i < tbNum; i++) {
-      meta = CatalogUtil.newTableMeta((Schema) schema.clone(), StoreType.CSV);
-      meta.putOption(CSVFile.DELIMITER, ",");
-
-      Path dataRoot = sm.getBaseDir();
-      Path tablePath = StorageUtil.concatPath(dataRoot, "table"+i, "file.csv");
-      if (fs.exists(tablePath.getParent())) {
-        fs.delete(tablePath.getParent(), true);
-      }
-      fs.mkdirs(tablePath.getParent());
-      appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
-      appender.init();
-      tupleNum = 100;
-      for (j = 0; j < tupleNum; j++) {
-        appender.addTuple(t);
-      }
-      appender.close();
-
-      TableDesc desc = CatalogUtil
-          .newTableDesc("table" + i, (TableMeta) meta.clone(), sm.getTablePath("table" + i));
-      catalog.addTable(desc);
-    }
-
-    //QueryIdFactory.reset();
-    queryId = QueryIdFactory.newQueryId();
-    optimizer = new GlobalOptimizer();
-  }
-
-  @AfterClass
-  public static void terminate() throws IOException {
-    util.shutdownCatalogCluster();
-  }
-
-  @Test
-  public void testReduceLogicalQueryUnitSteps() throws IOException, PlanningException {
-    Expr expr = analyzer.parse(
-        "select table0.age,table0.salary,table1.salary from table0,table1 where table0.salary = table1.salary order by table0.age");
-    LogicalPlan plan = logicalPlanner.createPlan(expr);
-    LogicalNode rootNode = logicalOptimizer.optimize(plan);
-
-    MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
-    globalPlan = optimizer.optimize(globalPlan);
-
-    ExecutionBlock unit = globalPlan.getRoot();
-    StoreTableNode store = unit.getStoreTableNode();
-    assertEquals(NodeType.PROJECTION, store.getChild().getType());
-    ProjectionNode proj = (ProjectionNode) store.getChild();
-    assertEquals(NodeType.SORT, proj.getChild().getType());
-    SortNode sort = (SortNode) proj.getChild();
-    assertEquals(NodeType.SCAN, sort.getChild().getType());
-    ScanNode scan = (ScanNode) sort.getChild();
-
-    assertTrue(unit.hasChildBlock());
-    unit = unit.getChildBlock(scan);
-    store = unit.getStoreTableNode();
-    assertEquals(NodeType.SORT, store.getChild().getType());
-    sort = (SortNode) store.getChild();
-    assertEquals(NodeType.JOIN, sort.getChild().getType());
-
-    assertTrue(unit.hasChildBlock());
-    for (ScanNode prevscan : unit.getScanNodes()) {
-      ExecutionBlock prev = unit.getChildBlock(prevscan);
-      store = prev.getStoreTableNode();
-      assertEquals(NodeType.SCAN, store.getChild().getType());
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
new file mode 100644
index 0000000..cc5a001
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.ExecutionBlock;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMasterPlan {
+
+  @Test
+  public void testConnect() {
+    MasterPlan masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+
+    ExecutionBlock eb1 = masterPlan.newExecutionBlock();
+    ExecutionBlock eb2 = masterPlan.newExecutionBlock();
+    ExecutionBlock eb3 = masterPlan.newExecutionBlock();
+
+    masterPlan.addConnect(eb1, eb2, TajoWorkerProtocol.PartitionType.LIST_PARTITION);
+    assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId()));
+
+    masterPlan.addConnect(eb3, eb2, TajoWorkerProtocol.PartitionType.LIST_PARTITION);
+    assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
+    assertTrue(masterPlan.isConnected(eb3.getId(), eb2.getId()));
+
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId()));
+    assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb3.getId()));
+
+    masterPlan.disconnect(eb3, eb2);
+    assertFalse(masterPlan.isConnected(eb3, eb2));
+    assertFalse(masterPlan.isReverseConnected(eb2, eb3));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index b714981..beec1ea 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -140,7 +141,7 @@ public class TestBNLJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), merged, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 
@@ -175,7 +176,7 @@ public class TestBNLJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
     TaskAttemptContext ctx =
-        new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+        new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),
             merged, workDir);
     Expr context = analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 8021882..1fc82c0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -40,7 +41,6 @@ import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -94,7 +94,7 @@ public class TestBSTIndexExec {
     this.idxSchema = new Schema();
     idxSchema.addColumn("managerId", Type.INT4);
     SortSpec[] sortKeys = new SortSpec[1];
-    sortKeys[0] = new SortSpec(idxSchema.getColumn("managerId"), true, false);
+    sortKeys[0] = new SortSpec(idxSchema.getColumnByFQN("managerId"), true, false);
     this.comp = new TupleComparator(idxSchema, sortKeys);
 
     this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
@@ -159,7 +159,7 @@ public class TestBSTIndexExec {
     Fragment[] frags = StorageManager.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERY);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
@@ -189,10 +189,10 @@ public class TestBSTIndexExec {
     @Override
     public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
         throws IOException {
-      Preconditions.checkNotNull(ctx.getTable(scanNode.getTableId()),
-          "Error: There is no table matched to %s", scanNode.getTableId());
+      Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
+          "Error: There is no table matched to %s", scanNode.getTableName());
 
-      Fragment[] fragments = ctx.getTables(scanNode.getTableId());
+      Fragment[] fragments = ctx.getTables(scanNode.getTableName());
       
       Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 01fd370..864c776 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -33,7 +34,6 @@ import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -112,7 +112,7 @@ public class TestExternalSortExec {
         Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 886dddc..94de6bc 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -147,7 +148,7 @@ public class TestHashAntiJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), merged, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     optimizer.optimize(plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index cf89cf8..0654042 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -140,7 +141,7 @@ public class TestHashJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), merged, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index d986a8f..1d0d139 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -151,7 +152,7 @@ public class TestHashSemiJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), merged, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
     optimizer.optimize(plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index e77a734..f130187 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -156,7 +157,7 @@ public class TestMergeJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), merged, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 2d82f6c..5b2f658 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -33,6 +34,7 @@ import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -59,6 +61,8 @@ public class TestNLJoinExec {
   private TableDesc employee;
   private TableDesc people;
 
+  private MasterPlan masterPlan;
+
   @Before
   public void setUp() throws Exception {
     util = new TajoTestingCluster();
@@ -118,6 +122,8 @@ public class TestNLJoinExec {
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
+
+    masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
   }
 
   @After
@@ -142,7 +148,7 @@ public class TestNLJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), merged, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr context = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
 
@@ -170,7 +176,7 @@ public class TestNLJoinExec {
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(), merged, workDir);
+        LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), merged, workDir);
     Expr context =  analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
     //LogicalOptimizer.optimize(ctx, plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 5358d3a..aa8e4d7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,9 +25,7 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -41,8 +39,8 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -60,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
 import static org.junit.Assert.*;
 
 public class TestPhysicalPlanner {
@@ -75,6 +74,8 @@ public class TestPhysicalPlanner {
   private static TableDesc employee = null;
   private static TableDesc score = null;
 
+  private static MasterPlan masterPlan;
+
   @BeforeClass
   public static void setUp() throws Exception {
     util = new TajoTestingCluster();
@@ -148,6 +149,8 @@ public class TestPhysicalPlanner {
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
     optimizer = new LogicalOptimizer();
+
+    masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
   }
 
   @AfterClass
@@ -180,8 +183,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
         employee.getPath(), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil
-        .newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -210,8 +212,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
         employee.getPath(), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil
-        .newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERIES[16]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -238,7 +239,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
@@ -268,7 +269,7 @@ public class TestPhysicalPlanner {
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(
         "target/test-data/testHashGroupByPlanWithALLField");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERIES[15]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -296,7 +297,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[]{frags[0]}, workDir);
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
@@ -353,7 +354,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] },
         workDir);
     ctx.setOutputPath(new Path(workDir, "grouped1"));
@@ -394,7 +395,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] },
         workDir);
     ctx.setOutputPath(new Path(workDir, "grouped2"));
@@ -433,10 +434,9 @@ public class TestPhysicalPlanner {
   public final void testPartitionedStorePlan() throws IOException, PlanningException {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
-    QueryUnitAttemptId id = TUtil.newQueryUnitAttemptId();
+    QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] },
-        workDir);
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir);
     Expr context = analyzer.parse(QUERIES[7]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
@@ -444,13 +444,13 @@ public class TestPhysicalPlanner {
     int numPartitions = 3;
     Column key1 = new Column("score.deptName", Type.TEXT);
     Column key2 = new Column("score.class", Type.TEXT);
-    StoreTableNode storeNode = new StoreTableNode("partition");
-    storeNode.setPartitions(PartitionType.HASH, new Column[]{key1, key2}, numPartitions);
-    PlannerUtil.insertNode(rootNode, storeNode);
+    DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+        PartitionType.HASH_PARTITION, numPartitions);
+    dataChannel.setPartitionKey(new Column[]{key1, key2});
+    ctx.setDataChannel(dataChannel);
     rootNode = optimizer.optimize(plan);
 
-    TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(),
-        StoreType.CSV);
+    TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV);
 
     FileSystem fs = sm.getFileSystem();
 
@@ -492,7 +492,7 @@ public class TestPhysicalPlanner {
       throws IOException, PlanningException {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
-    QueryUnitAttemptId id = TUtil.newQueryUnitAttemptId();
+    QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
 
     Path workDir = CommonTestingUtil.getTestDir(
         "target/test-data/testPartitionedStorePlanWithEmptyGroupingSet");
@@ -502,9 +502,10 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
     int numPartitions = 1;
-    StoreTableNode storeNode = new StoreTableNode("emptyset");
-    storeNode.setPartitions(PartitionType.HASH, new Column[] {}, numPartitions);
-    PlannerUtil.insertNode(rootNode, storeNode);
+    DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+        PartitionType.HASH_PARTITION, numPartitions);
+    dataChannel.setPartitionKey(new Column[]{});
+    ctx.setDataChannel(dataChannel);
     optimizer.optimize(plan);
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(),
@@ -549,14 +550,14 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr context = analyzer.parse(QUERIES[8]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
     // Set all aggregation functions to the first phase mode
-    GroupbyNode groupbyNode = (GroupbyNode) PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
+    GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
     for (Target target : groupbyNode.getTargets()) {
       for (EvalNode eval : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
         if (eval instanceof AggFuncCallEval) {
@@ -582,7 +583,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr context = analyzer.parse(QUERIES[9]);
     LogicalPlan plan = planner.createPlan(context);
@@ -613,7 +614,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr context = analyzer.parse(QUERIES[11]);
     LogicalPlan plan = planner.createPlan(context);
@@ -636,7 +637,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { frags[0] }, workDir);
     Expr  context = analyzer.parse(QUERIES[0]);
     LogicalPlan plan = planner.createPlan(context);
@@ -660,7 +661,7 @@ public class TestPhysicalPlanner {
   @Test
   public final void testEvalExpr() throws IOException, PlanningException {
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] { }, workDir);
     Expr expr = analyzer.parse(QUERIES[12]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -696,7 +697,7 @@ public class TestPhysicalPlanner {
     Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] {frags[0]}, workDir);
     Expr context = analyzer.parse(createIndexStmt[0]);
     LogicalPlan plan = planner.createPlan(context);
@@ -723,7 +724,7 @@ public class TestPhysicalPlanner {
         Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] {frags[0]}, workDir);
     Expr expr = analyzer.parse(duplicateElimination[0]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -755,23 +756,28 @@ public class TestPhysicalPlanner {
         employee.getPath(), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil.newQueryUnitAttemptId(),
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
         new Fragment[] {frags[0]}, workDir);
     Expr context = analyzer.parse(SORT_QUERY[0]);
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
+    SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
+    DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
+        PartitionType.RANGE_PARTITION);
+    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
+    ctx.setDataChannel(channel);
+
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
-    ProjectionExec proj = (ProjectionExec) exec;
-    ExternalSortExec sort = (ExternalSortExec) proj.getChild();
-
-    SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
-    IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort, sort.getSchema(), sort.getSchema(), sortSpecs);
+//    ProjectionExec proj = (ProjectionExec) exec;
+//    ExternalSortExec sort = (ExternalSortExec) proj.getChild();
+//
+//    SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
+    //IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort, sort.getSchema(), sort.getSchema(), sortSpecs);
 
     Tuple tuple;
-    exec = idxStoreExec;
     exec.init();
     exec.next();
     exec.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 06c5bb7..c655e05 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -33,7 +34,6 @@ import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -109,7 +109,7 @@ public class TestSortExec {
   public final void testNext() throws IOException, PlanningException {
     Fragment [] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
-    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility
         .newQueryUnitAttemptId(),
         new Fragment[] { frags[0] }, workDir);
     Expr context = analyzer.parse(QUERIES[0]);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index cc497a8..a11cdbf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.query;
 
+import org.apache.tajo.client.ResultSetUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -61,6 +62,25 @@ public class TestGroupByQuery {
   }
 
   @Test
+  public final void testComplexParameterWithSubQuery() throws Exception {
+
+
+    ResultSet res = tpch.execute(
+        "select count(*) as total from ("+
+            "        select * from lineitem " +
+            "        union all"+
+            "        select * from lineitem ) l");
+    try {
+      assertNotNull(res);
+      assertTrue(res.next());
+      assertTrue(10 == (int) res.getDouble("total"));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
+  }
+
+  @Test
   public final void testComplexParameter2() throws Exception {
     ResultSet res = tpch.execute("select count(*) + max(l_orderkey) as merged from lineitem");
     try {
@@ -71,6 +91,8 @@ public class TestGroupByQuery {
     }
   }
 
+
+
   //@Test
   public final void testCube() throws Exception {
     ResultSet res = tpch.execute(
@@ -85,19 +107,4 @@ public class TestGroupByQuery {
       res.close();
     }
   }
-
-  //@Test
-  // TODO - to fix the limit processing and then enable it
-  public final void testGroupByLimit() throws Exception {
-    ResultSet res = tpch.execute("select l_orderkey from lineitem limit 2");
-    try {
-      int count = 0;
-      for (;res.next();) {
-        count++;
-      }
-      assertEquals(2, count);
-    } finally {
-      res.close();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
index d006679..0182067 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
@@ -103,7 +103,7 @@ public class TestResultSetImpl {
     assertEquals(schema.getColumnNum(), meta.getColumnCount());
     for (int i = 0; i < meta.getColumnCount(); i++) {
       assertEquals(schema.getColumn(i).getColumnName(), meta.getColumnName(i + 1));
-      assertEquals(schema.getColumn(i).getTableName(), meta.getTableName(i + 1));
+      assertEquals(schema.getColumn(i).getQualifier(), meta.getTableName(i + 1));
       assertEquals(schema.getColumn(i).getDataType().getClass().getCanonicalName(),
           meta.getColumnTypeName(i + 1));
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index 6c10888..df6eaa0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -400,9 +400,11 @@ public class TestSelectQuery {
       for (;res.next();) {
         count++;
       }
+      assertEquals(10, count);
     } finally {
       res.close();
     }
+
   }
 
   @Test
@@ -417,8 +419,7 @@ public class TestSelectQuery {
     assertEquals(5, orderKeys.getMeta().getStat().getNumRows().intValue());
   }
 
-  //@Test
-  // TODO - fix and enable this unit test
+  @Test
   public final void testLimit() throws Exception {
     ResultSet res = tpch.execute("select l_orderkey from lineitem limit 3");
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
new file mode 100644
index 0000000..e445089
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.client.ResultSetUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestTableSubQuery {
+  private static TpchTestBase tpch;
+  public TestTableSubQuery() throws IOException {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    tpch = TpchTestBase.getInstance();
+  }
+
+  @Test
+  public final void testTableSubquery1() throws Exception {
+    ResultSet res = tpch.execute(
+        "select l_orderkey from (select * from lineitem) as l");
+    try {
+      int count = 0;
+      for (;res.next();) {
+        count++;
+      }
+      assertEquals(5, count);
+    } finally {
+      res.close();
+    }
+  }
+
+  @Test
+  public final void testGroupBySubQuery() throws Exception {
+    ResultSet res = tpch.execute(
+        "select sum(l_extendedprice * l_discount) as revenue from (select * from lineitem) as l");
+    try {
+      assertNotNull(res);
+      assertTrue(res.next());
+      assertTrue(12908 == (int) res.getDouble("revenue"));
+      assertFalse(res.next());
+    } finally {
+      res.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index b5ce437..6ffc532 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -16,7 +16,7 @@ package org.apache.tajo.master;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.benchmark.TPCH;
@@ -73,7 +73,7 @@ public class TestExecutionBlockCursor {
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
-    planner = new GlobalPlanner(conf, sm, dispatcher.getEventHandler());
+    planner = new GlobalPlanner(conf, sm);
   }
 
   public static void tearDown() {
@@ -90,8 +90,10 @@ public class TestExecutionBlockCursor {
             "join partsupp on s_suppkey = ps_suppkey " +
             "join part on p_partkey = ps_partkey and p_type like '%BRASS' and p_size = 15");
     LogicalPlan logicalPlan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = optimizer.optimize(logicalPlan);
-    MasterPlan plan = planner.build(QueryIdFactory.newQueryId(), (LogicalRootNode) rootNode);
+    optimizer.optimize(logicalPlan);
+    QueryContext queryContext = new QueryContext();
+    MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), queryContext, logicalPlan);
+    planner.build(plan);
 
     ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
 
@@ -101,7 +103,7 @@ public class TestExecutionBlockCursor {
       count++;
     }
 
-    // 4 input relations, 4 join, and 1 projection = 9 execution blocks
-    assertEquals(9, count);
+    // 4 input relations, 4 join, and 1 terminal = 9 execution blocks
+    assertEquals(10, count);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
new file mode 100644
index 0000000..1e89ce5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.util.FileUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+public class TestGlobalPlanner {
+
+  private static TajoTestingCluster util;
+  private static CatalogService catalog;
+  private static SQLAnalyzer sqlAnalyzer;
+  private static LogicalPlanner planner;
+  private static LogicalOptimizer optimizer;
+  private static TPCH tpch;
+  private static GlobalPlanner globalPlanner;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+    catalog = util.getMiniCatalogCluster().getCatalog();
+    for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
+      catalog.registerFunction(funcDesc);
+    }
+
+    // TPC-H Schema for Complex Queries
+    String [] tpchTables = {
+        "part", "supplier", "partsupp", "nation", "region", "lineitem", "orders", "customer"
+    };
+    tpch = new TPCH();
+    tpch.loadSchemas();
+    tpch.loadOutSchema();
+    for (String table : tpchTables) {
+      TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(table), CatalogProtos.StoreType.CSV);
+      TableDesc d = CatalogUtil.newTableDesc(table, m, new Path("file:///"));
+      catalog.addTable(d);
+    }
+
+    sqlAnalyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
+    globalPlanner = new GlobalPlanner(util.getConfiguration(),
+        StorageManagerFactory.getStorageManager(util.getConfiguration()));
+  }
+
+  private MasterPlan buildPlan(String sql) throws PlanningException, IOException {
+    Expr expr = sqlAnalyzer.parse(sql);
+    LogicalPlan plan = planner.createPlan(expr);
+    optimizer.optimize(plan);
+    QueryContext context = new QueryContext();
+    MasterPlan masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), context, plan);
+    globalPlanner.build(masterPlan);
+    return masterPlan;
+  }
+
+  @Test
+  public void testSelectDistinct() throws Exception {
+    MasterPlan masterPlan = buildPlan("select distinct l_orderkey from lineitem");
+  }
+
+  @Test
+  public void testSortAfterGroupBy() throws Exception {
+    MasterPlan masterPlan = buildPlan("select max(l_quantity) as max_quantity, l_orderkey from lineitem group by l_orderkey order by max_quantity");
+  }
+
+  @Test
+  public void testSortLimit() throws Exception {
+    MasterPlan masterPlan = buildPlan("select max(l_quantity) as max_quantity, l_orderkey from lineitem group by l_orderkey order by max_quantity limit 3");
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    buildPlan("select n_name, r_name, n_regionkey, r_regionkey from nation, region");
+  }
+
+  @Test
+  public void testMultipleJoin() throws Exception {
+    buildPlan(FileUtil.readTextFile(new File("src/test/queries/tpch_q2_simplified.tql")));
+  }
+
+  @Test
+  public void testUnion() throws IOException, PlanningException {
+    buildPlan("select o_custkey as num from orders union select c_custkey as num from customer union select p_partkey as num from part");
+  }
+
+  @Test
+  public void testSubQuery() throws IOException, PlanningException {
+    buildPlan("select l.l_orderkey from (select * from lineitem) l");
+  }
+
+  @Test
+  public void testSubQueryJoin() throws IOException, PlanningException {
+    buildPlan("select l.l_orderkey from (select * from lineitem) l join (select * from orders) o on l.l_orderkey = o.o_orderkey");
+  }
+
+  @Test
+  public void testSubQueryGroupBy() throws IOException, PlanningException {
+    buildPlan("select sum(l_extendedprice*l_discount) as revenue from (select * from lineitem) as l");
+  }
+
+  @Test
+  public void testSubQueryGroupBy2() throws IOException, PlanningException {
+    buildPlan("select l_orderkey, sum(l_extendedprice*l_discount)  as revenue from (select * from lineitem) as l group by l_orderkey");
+  }
+
+  @Test
+  public void testSubQuerySortAfterGroup() throws IOException, PlanningException {
+    buildPlan("select l_orderkey, sum(l_extendedprice*l_discount)  as revenue from (select * from lineitem) as l group by l_orderkey order by l_orderkey");
+  }
+
+  @Test
+  public void testSubQuerySortAfterGroupMultiBlocks() throws IOException, PlanningException {
+    buildPlan(
+        "select l_orderkey, revenue from (" +
+          "select l_orderkey, sum(l_extendedprice*l_discount) as revenue from lineitem group by l_orderkey"
+        +") l1"
+
+    );
+  }
+
+  @Test
+  public void testSubQuerySortAfterGroupMultiBlocks2() throws IOException, PlanningException {
+    buildPlan(
+        "select l_orderkey, revenue from (" +
+          "select l_orderkey, revenue from (" +
+              "select l_orderkey, sum(l_extendedprice*l_discount) as revenue from lineitem group by l_orderkey"
+              +") l1" +
+          ") l2 order by l_orderkey"
+
+    );
+  }
+
+  @Test
+  public void testComplexUnion1() throws Exception {
+    buildPlan(FileUtil.readTextFile(new File("src/test/queries/complex_union_1.sql")));
+  }
+
+  @Test
+  public void testComplexUnion2() throws Exception {
+    buildPlan(FileUtil.readTextFile(new File("src/test/queries/complex_union_2.sql")));
+  }
+
+  @Test
+  public void testUnionGroupBy1() throws Exception {
+    buildPlan("select l_orderkey, sum(l_extendedprice*l_discount) as revenue from (" +
+        "select * from lineitem " +
+        "union " +
+        "select * from lineitem ) l group by l_orderkey");
+  }
+}