You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/27 03:30:57 UTC

[1/2] TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)

Updated Branches:
  refs/heads/master 6cf96bd4e -> fef3dd509


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
deleted file mode 100644
index 4abeefb..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.collect.Lists;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class SchedulerUtils {
-
-  public static class MapComparatorBySize implements Comparator<Map>{
-
-    @Override
-    public int compare(Map m1, Map m2) {
-      return m1.size() - m2.size();
-    }
-  }
-
-  public static List<Map> sortListOfMapsBySize(
-      final List<Map> maplist) {
-    Map[] arr = new Map[maplist.size()];
-    arr = maplist.toArray(arr);
-    Arrays.sort(arr, new MapComparatorBySize());
-
-    List<Map> newlist = Lists.newArrayList(arr);
-    return newlist;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index 754f267..ff73334 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -22,14 +22,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.*;
+import tajo.QueryIdFactory;
 import tajo.QueryUnitId;
 import tajo.SubQueryId;
 import tajo.catalog.*;
@@ -43,6 +44,7 @@ import tajo.engine.planner.PlannerUtil;
 import tajo.engine.planner.logical.*;
 import tajo.master.QueryMaster.QueryContext;
 import tajo.master.event.*;
+import tajo.storage.Fragment;
 import tajo.storage.StorageManager;
 import tajo.util.IndexUtil;
 
@@ -57,38 +59,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static tajo.conf.TajoConf.ConfVars;
 
 
+/**
+ * SubQuery is an instance of an ExecutionBlock.
+ */
 public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private static final Log LOG = LogFactory.getLog(SubQuery.class);
 
-  public enum PARTITION_TYPE {
-    /** for hash partitioning */
-    HASH,
-    LIST,
-    /** for map-side join */
-    BROADCAST,
-    /** for range partitioning */
-    RANGE
-  }
-
-  private SubQueryId id;
-  private LogicalNode plan = null;
-  private StoreTableNode store = null;
-  private List<ScanNode> scanlist = null;
-  private SubQuery next;
-  private Map<ScanNode, SubQuery> childSubQueries;
-  private PARTITION_TYPE outputType;
-  private boolean hasJoinPlan;
-  private boolean hasUnionPlan;
+  private ExecutionBlock block;
   private Priority priority;
   private TableStat stats;
   EventHandler eventHandler;
   final StorageManager sm;
-  private final GlobalPlanner planner;
-  private boolean isLeafQuery = false;
   TaskSchedulerImpl taskScheduler;
   QueryContext queryContext;
-  private Clock clock;
 
   private long startTime;
   private long finishTime;
@@ -106,7 +90,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       new StateMachineFactory <SubQuery, SubQueryState,
           SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
 
-          .addTransition(SubQueryState.NEW, EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
+          .addTransition(SubQueryState.NEW,
+              EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
               SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
 
           .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
@@ -148,20 +133,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private int completedTaskCount = 0;
 
-  public SubQuery(SubQueryId id, StorageManager sm, GlobalPlanner planner) {
-    this.id = id;
-    childSubQueries = new HashMap<ScanNode, SubQuery>();
-    scanlist = new ArrayList<ScanNode>();
-    hasJoinPlan = false;
-    hasUnionPlan = false;
+  public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) {
+    this.queryContext = context;
+    this.block = block;
     this.sm = sm;
-    this.planner = planner;
+    this.eventHandler = context.getEventHandler();
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
-
-
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -190,72 +170,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  public void setQueryContext(QueryContext context) {
-    this.queryContext = context;
-  }
-
-  public void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
-  public void setEventHandler(EventHandler eventHandler) {
-    this.eventHandler = eventHandler;
-  }
-
-  public boolean isLeafQuery() {
-    return this.isLeafQuery;
-  }
-
-  public void setLeafQuery() {
-    this.isLeafQuery = true;
+  public ExecutionBlock getBlock() {
+    return block;
   }
 
   public void addTask(QueryUnit task) {
     tasks.put(task.getId(), task);
   }
-  
-  public void setOutputType(PARTITION_TYPE type) {
-    this.outputType = type;
-  }
-
-  public GlobalPlanner getPlanner() {
-    return planner;
-  }
-  
-  public void setLogicalPlan(LogicalNode plan) {
-    hasJoinPlan = false;
-    Preconditions.checkArgument(plan.getType() == ExprType.STORE
-        || plan.getType() == ExprType.CREATE_INDEX);
-
-    this.plan = plan;
-    if (plan instanceof StoreTableNode) {
-      store = (StoreTableNode) plan;      
-    } else {
-      store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
-    }
-    
-    LogicalNode node = plan;
-    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
-    s.add(node);
-    while (!s.isEmpty()) {
-      node = s.remove(s.size()-1);
-      if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) node;
-        s.add(s.size(), unary.getSubNode());
-      } else if (node instanceof BinaryNode) {
-        BinaryNode binary = (BinaryNode) node;
-        if (binary.getType() == ExprType.JOIN) {
-          hasJoinPlan = true;
-        } else if (binary.getType() == ExprType.UNION) {
-          hasUnionPlan = true;
-        }
-        s.add(s.size(), binary.getOuterNode());
-        s.add(s.size(), binary.getInnerNode());
-      } else if (node instanceof ScanNode) {
-        scanlist.add((ScanNode)node);
-      }
-    }
-  }
 
   public void abortSubQuery(SubQueryState finalState) {
     // TODO -
@@ -271,37 +192,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return this.stateMachine;
   }
 
-  public boolean hasJoinPlan() {
-    return this.hasJoinPlan;
-  }
-
-  public boolean hasUnionPlan() {
-    return this.hasUnionPlan;
-  }
-  
-  public void setParentQuery(SubQuery next) {
-    this.next = next;
-  }
-  
-  public void addChildQuery(ScanNode prevscan, SubQuery prev) {
-    childSubQueries.put(prevscan, prev);
-  }
-  
-  public void addChildQueries(Map<ScanNode, SubQuery> prevs) {
-    this.childSubQueries.putAll(prevs);
-  }
-  
-  public void setQueryUnits(List<QueryUnit> queryUnits) {
-    for (QueryUnit task: queryUnits) {
-      tasks.put(task.getId(), task);
-    }
-  }
-  
-  public void removeChildQuery(ScanNode scan) {
-    scanlist.remove(scan);
-    this.childSubQueries.remove(scan);
-  }
-
   public void setPriority(int priority) {
     if (this.priority == null) {
       this.priority = new Priority(priority);
@@ -316,56 +206,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     this.stats = stat;
   }
   
-  public SubQuery getParentQuery() {
-    return this.next;
-  }
-  
-  public boolean hasChildQuery() {
-    return !this.childSubQueries.isEmpty();
-  }
-  
-  public Iterator<SubQuery> getChildIterator() {
-    return this.childSubQueries.values().iterator();
-  }
-  
-  public Collection<SubQuery> getChildQueries() {
-    return this.childSubQueries.values();
-  }
-  
-  public Map<ScanNode, SubQuery> getChildMaps() {
-    return this.childSubQueries;
-  }
-  
   public SubQuery getChildQuery(ScanNode scanForChild) {
-    return this.childSubQueries.get(scanForChild);
-  }
-  
-  public String getOutputName() {
-    return this.store.getTableName();
-  }
-  
-  public PARTITION_TYPE getOutputType() {
-    return this.outputType;
-  }
-  
-  public Schema getOutputSchema() {
-    return this.store.getOutSchema();
-  }
-  
-  public StoreTableNode getStoreTableNode() {
-    return this.store;
-  }
-  
-  public ScanNode[] getScanNodes() {
-    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
-  }
-  
-  public LogicalNode getLogicalPlan() {
-    return this.plan;
+    return queryContext.getSubQuery(block.getChildBlock(scanForChild).getId());
   }
   
   public SubQueryId getId() {
-    return this.id;
+    return block.getId();
   }
   
   public QueryUnit[] getQueryUnits() {
@@ -387,13 +233,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append(this.id);
-/*    sb.append(" plan: " + plan.toString());
-    sb.append("next: " + next + " childSubQueries:");
-    Iterator<SubQuery> it = getChildIterator();
-    while (it.hasNext()) {
-      sb.append(" " + it.next());
-    }*/
+    sb.append(this.getId());
     return sb.toString();
   }
   
@@ -401,18 +241,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   public boolean equals(Object o) {
     if (o instanceof SubQuery) {
       SubQuery other = (SubQuery)o;
-      return this.id.equals(other.getId());
+      return getId().equals(other.getId());
     }
     return false;
   }
   
   @Override
   public int hashCode() {
-    return this.id.hashCode();
+    return getId().hashCode();
   }
   
   public int compareTo(SubQuery other) {
-    return this.id.compareTo(other.id);
+    return getId().compareTo(other.getId());
   }
 
   public SubQueryState getState() {
@@ -439,8 +279,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     int numBlocks = 0, numPartitions = 0;
     List<ColumnStat> columnStats = Lists.newArrayList();
 
-    for (SubQuery child : unit.getChildQueries()) {
-      childStat = child.getStats();
+    Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
+    while (it.hasNext()) {
+      ExecutionBlock block = it.next();
+      SubQuery childSubQuery = unit.queryContext.getSubQuery(block.getId());
+      childStat = childSubQuery.getStats();
       avgRows += childStat.getAvgRows();
       columnStats.addAll(childStat.getColumnStats());
       numBlocks += childStat.getNumBlocks();
@@ -448,6 +291,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       numPartitions += childStat.getNumPartitions();
       numRows += childStat.getNumRows();
     }
+
     stat.setColumnStats(columnStats);
     stat.setNumBlocks(numBlocks);
     stat.setNumBytes(numBytes);
@@ -458,7 +302,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   }
 
   public void cleanUp() {
-    if (hasUnionPlan()) {
+    if (block.hasUnion()) {
       try {
         // write meta and continue
         TableStat stat = generateUnionStat(this);
@@ -469,35 +313,38 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         e.printStackTrace();
       }
     } else {
+      LOG.info("SubQuery: " + getId() + " sets TableStat");
       TableStat stat = generateStat();
-      setStats(stat);
       try {
         writeStat(this, stat);
       } catch (IOException e) {
       }
     }
 
-    finishTime = clock.getTime();
+    finishTime = queryContext.getClock().getTime();
   }
 
 
-  private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+  private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
+      SubQueryEvent, SubQueryState> {
 
     @Override
     public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      subQuery.startTime = subQuery.clock.getTime();
+      subQuery.startTime = subQuery.queryContext.getClock().getTime();
       subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.queryContext);
       subQuery.taskScheduler.init(subQuery.queryContext.getConf());
       subQuery.taskScheduler.start();
 
+      ExecutionBlock execBlock = subQuery.getBlock();
+
       try {
         // if subquery is dummy, which means it requires only a logical step
         // instead of actual query. An 'union all' is an example of
         // a dummy subquery.
-        if (subQuery.hasUnionPlan()) {
+        if (execBlock.hasUnion()) {
           subQuery.finishUnionUnit();
           subQuery.cleanUp();
-          TableMeta meta = new TableMetaImpl(subQuery.getOutputSchema(),
+          TableMeta meta = new TableMetaImpl(execBlock.getOutputSchema(),
               StoreType.CSV, new Options(), subQuery.getStats());
           subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
               meta));
@@ -505,35 +352,39 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         } else {
           QueryUnit [] tasks;
           // TODO - should be improved
-          if (subQuery.isLeafQuery() && subQuery.getScanNodes().length == 1) {
-            SubQuery parent = subQuery.getParentQuery();
+          if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) {
+
             // if parent is join, this subquery is for partitioning data.
-            if (parent != null) {
+            if (execBlock.hasParentBlock()) {
               int numTasks = calculatePartitionNum(subQuery);
-              subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, numTasks);
+              Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
             }
 
-            tasks = subQuery.getPlanner().createLeafTasks(subQuery);
-          } else if (subQuery.getScanNodes().length > 1) {
-            SubQuery parent = subQuery.getParentQuery();
+            tasks = createLeafTasks(subQuery);
+          } else if (execBlock.getScanNodes().length > 1) {
             // if parent is join, this subquery is for partitioning data.
-            if (parent != null) {
+            if (execBlock.hasParentBlock()) {
               int numTasks = calculatePartitionNum(subQuery);
-              subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, numTasks);
+              Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
+            }
+
+            if (subQuery.getId().getId() == 15) {
+              System.out.println("error point!");
             }
+
             tasks = Repartitioner.createJoinTasks(subQuery);
 
           } else {
-            SubQuery parent = subQuery.getParentQuery();
             // if parent is join, this subquery is for partitioning data.
-            if (parent != null) {
+            if (execBlock.hasParentBlock()) {
               int partitionNum = calculatePartitionNum(subQuery);
-              subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, partitionNum);
+              Repartitioner.setPartitionNumberForTwoPhase(subQuery, partitionNum);
             }
             int numTasks = getNonLeafTaskNum(subQuery);
 
-            tasks = Repartitioner.createNonLeafTask(subQuery,
-                subQuery.getChildIterator().next(), numTasks);
+            SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId();
+            SubQuery child = subQuery.queryContext.getSubQuery(childId);
+            tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
           }
           for (QueryUnit task : tasks) {
             subQuery.addTask(task);
@@ -543,7 +394,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           // if there is no tasks
           if (subQuery.tasks.size() == 0) {
             subQuery.cleanUp();
-            TableMeta meta = toTableMeta(subQuery.getStoreTableNode());
+            TableMeta meta = toTableMeta(execBlock.getStoreTableNode());
             meta.setStat(subQuery.getStats());
             subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
                 meta));
@@ -565,10 +416,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
             org.apache.hadoop.yarn.api.records.Priority priority =
                 RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
                     org.apache.hadoop.yarn.api.records.Priority.class);
-            priority.setPriority(100 - subQuery.getPriority().get());
+            priority.setPriority(subQuery.getPriority().get());
             ContainerAllocationEvent event =
                 new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
-                    subQuery.getId(), priority, resource, numRequest, subQuery.isLeafQuery(), 0.0f);
+                    subQuery.getId(), priority, resource, numRequest, execBlock.isLeafBlock(), 0.0f);
             subQuery.eventHandler.handle(event);
           }
         }
@@ -582,6 +433,48 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         return SubQueryState.FAILED;
       }
     }
+
+    public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      ScanNode[] scans = execBlock.getScanNodes();
+      Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+      TableMeta meta;
+      Path inputPath;
+
+      ScanNode scan = scans[0];
+      TableDesc desc = subQuery.queryContext.getCatalog().getTableDesc(scan.getTableId());
+      inputPath = desc.getPath();
+      meta = desc.getMeta();
+
+      // TODO - should be change the inner directory
+      Path oldPath = new Path(inputPath, "data");
+      FileSystem fs = inputPath.getFileSystem(subQuery.queryContext.getConf());
+      if (fs.exists(oldPath)) {
+        inputPath = oldPath;
+      }
+      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
+
+      QueryUnit queryUnit;
+      List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
+
+      int i = 0;
+      for (Fragment fragment : fragments) {
+        queryUnit = newQueryUnit(subQuery, i++);
+        queryUnit.setFragment(scan.getTableId(), fragment);
+        queryUnits.add(queryUnit);
+      }
+
+      return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
+    }
+
+    private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      QueryUnit unit = new QueryUnit(
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+          subQuery.eventHandler);
+      unit.setLogicalPlan(execBlock.getPlan());
+      return unit;
+    }
   }
 
   /**
@@ -593,25 +486,25 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
    */
   public static int calculatePartitionNum(SubQuery subQuery) {
     TajoConf conf = subQuery.queryContext.getConf();
-    SubQuery parent = subQuery.getParentQuery();
+    ExecutionBlock parent = subQuery.getBlock().getParentBlock();
 
     GroupbyNode grpNode = null;
     if (parent != null) {
       grpNode = (GroupbyNode) PlannerUtil.findTopNode(
-          parent.getLogicalPlan(), ExprType.GROUP_BY);
+          parent.getPlan(), ExprType.GROUP_BY);
     }
 
     // Is this subquery the first step of join?
     if (parent != null && parent.getScanNodes().length == 2) {
-      Iterator<SubQuery> child = parent.getChildQueries().iterator();
+      Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
 
       // for inner
-      SubQuery outer = child.next();
-      long outerVolume = getInputVolume(outer);
+      ExecutionBlock outer = child.next();
+      long outerVolume = getInputVolume(subQuery.queryContext, outer);
 
       // for inner
-      SubQuery inner = child.next();
-      long innerVolume = getInputVolume(inner);
+      ExecutionBlock inner = child.next();
+      long innerVolume = getInputVolume(subQuery.queryContext, inner);
       LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
       LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
 
@@ -631,7 +524,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       if (grpNode.getGroupingColumns().length == 0) {
         return 1;
       } else {
-        long volume = getInputVolume(subQuery);
+        long volume = getInputVolume(subQuery.queryContext, subQuery.block);
 
         int mb = (int) Math.ceil((double)volume / 1048576);
         LOG.info("Table's volume is approximately " + mb + " MB");
@@ -643,7 +536,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       }
     } else {
       LOG.info("============>>>>> Unexpected Case! <<<<<================");
-      long volume = getInputVolume(subQuery);
+      long volume = getInputVolume(subQuery.queryContext, subQuery.block);
 
       int mb = (int) Math.ceil((double)volume / 1048576);
       LOG.info("Table's volume is approximately " + mb + " MB");
@@ -654,20 +547,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  public static long getInputVolume(SubQuery subQuery) {
-    CatalogService catalog = subQuery.queryContext.getCatalog();
-    if (subQuery.hasChildQuery()) {
-      Iterator<SubQuery> it = subQuery.getChildQueries().iterator();
+  public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) {
+    CatalogService catalog = context.getCatalog();
+    if (execBlock.isLeafBlock()) {
+      ScanNode outerScan = execBlock.getScanNodes()[0];
+      TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
+      return stat.getNumBytes();
+    } else {
       long aggregatedVolume = 0;
-      while(it.hasNext()) {
-        aggregatedVolume += it.next().getStats().getNumBytes();
+      for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
+        SubQuery subquery = context.getSubQuery(childBlock.getId());
+        aggregatedVolume += subquery.getStats().getNumBytes();
       }
 
       return aggregatedVolume;
-    } else {
-      ScanNode outerScan = subQuery.getScanNodes()[0];
-      TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
-      return stat.getNumBytes();
     }
   }
 
@@ -679,7 +572,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
    */
   public static int getNonLeafTaskNum(SubQuery subQuery) {
     // Getting intermediate data size
-    long volume = getInputVolume(subQuery);
+    long volume = getInputVolume(subQuery.queryContext, subQuery.getBlock());
 
     int mb = (int) Math.ceil((double)volume / 1048576);
     LOG.info("Table's volume is approximately " + mb + " MB");
@@ -766,6 +659,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       // TODO - records succeeded, failed, killed completed task
       // TODO - records metrics
 
+      ExecutionBlock execBlock = subQuery.getBlock();
+
       for (Entry<ContainerId, Container> entry : subQuery.containers.entrySet()) {
         subQuery.eventHandler.handle(new TaskRunnerStopEvent(subQuery.getId(),
             entry.getValue()));
@@ -773,13 +668,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       subQuery.cleanUp();
       subQuery.taskScheduler.stop();
 
-      StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+      StoreTableNode storeTableNode = execBlock.getStoreTableNode();
       TableMeta meta = toTableMeta(storeTableNode);
       meta.setStat(subQuery.getStats());
 
       subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
           meta));
-      subQuery.finishTime = subQuery.clock.getTime();
+      subQuery.finishTime = subQuery.queryContext.getClock().getTime();
     }
   }
 
@@ -809,15 +704,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private void writeStat(SubQuery subQuery, TableStat stat)
       throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
 
-    if (subQuery.getLogicalPlan().getType() == ExprType.CREATE_INDEX) {
-      IndexWriteNode index = (IndexWriteNode) subQuery.getLogicalPlan();
+    if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
+      IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
       Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index");
       TableMeta meta;
       if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
         meta = sm.getTableMeta(indexPath);
       } else {
-        StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+        StoreTableNode storeTableNode = execBlock.getStoreTableNode();
         meta = toTableMeta(storeTableNode);
       }
       String indexName = IndexUtil.getIndexName(index.getTableName(),
@@ -828,10 +724,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       sm.writeTableMeta(indexPath, meta);
 
     } else {
-      StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+      StoreTableNode storeTableNode = execBlock.getStoreTableNode();
       TableMeta meta = toTableMeta(storeTableNode);
       meta.setStat(stat);
-      sm.writeTableMeta(sm.getTablePath(subQuery.getOutputName()), meta);
+      sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
     }
   }
 
@@ -845,19 +741,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  private void finalizePrevSubQuery(SubQuery subQuery)
-      throws Exception {
-    SubQuery prevSubQuery;
-    for (ScanNode scan : subQuery.getScanNodes()) {
-      prevSubQuery = subQuery.getChildQuery(scan);
-      if (prevSubQuery.getStoreTableNode().getSubNode().getType() != ExprType.UNION) {
-        for (QueryUnit unit : prevSubQuery.getQueryUnits()) {
-          //sendCommand(unit.getLastAttempt(), CommandType.FINALIZE);
-        }
-      }
-    }
-  }
-
   @Override
   public void handle(SubQueryEvent event) {
     //if (LOG.isDebugEnabled()) {
@@ -871,14 +754,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
         LOG.error("Can't handle this event at current state", e);
-        eventHandler.handle(new SubQueryEvent(this.id,
+        eventHandler.handle(new SubQueryEvent(getId(),
             SubQueryEventType.SQ_INTERNAL_ERROR));
       }
 
       //notify the eventhandler of state change
       if (LOG.isDebugEnabled()) {
         if (oldState != getState()) {
-          LOG.debug(id + " SubQuery Transitioned from " + oldState + " to "
+          LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
               + getState());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
deleted file mode 100644
index 8756dc4..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.service.AbstractService;
-
-public class TaskContainerManager extends AbstractService {
-
-  public TaskContainerManager() {
-    super(TaskContainerManager.class.getName());
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
index 3f92608..816e285 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
@@ -389,8 +389,7 @@ public class TaskSchedulerImpl extends AbstractService
           LOG.debug("Assigned based on * match");
 
           QueryUnit task;
-          task = context.getQuery()
-              .getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
+          task = context.getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
               Lists.newArrayList(task.getAllFragments()),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
index 93b786d..43fc1d0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
@@ -126,7 +126,7 @@ public class WorkerListener extends AbstractService
                    QueryUnitAttemptIdProto attemptIdProto,
                    RpcCallback<BoolProto> done) {
     QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-    context.getQuery(attemptId.getQueryId()).getContext().getQuery().getSubQuery(attemptId.getSubQueryId()).
+    context.getQuery(attemptId.getQueryId()).getContext().getSubQuery(attemptId.getSubQueryId()).
         getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
         resetExpireTime();
     done.run(TRUE_PROTO);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
index 28ccbfe..d0fae9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
@@ -47,7 +47,7 @@ import tajo.engine.planner.logical.StoreTableNode;
 import tajo.engine.planner.physical.PhysicalExec;
 import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
 import tajo.ipc.protocolrecords.QueryUnitRequest;
-import tajo.master.SubQuery.PARTITION_TYPE;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.rpc.NullCallback;
 import tajo.storage.Fragment;
 import tajo.storage.StorageUtil;
@@ -101,7 +101,7 @@ public class Task {
   private AtomicBoolean progressFlag = new AtomicBoolean(false);
 
   // TODO - to be refactored
-  private PARTITION_TYPE partitionType = null;
+  private PartitionType partitionType = null;
   private Schema finalSchema = null;
   private TupleComparator sortComp = null;
 
@@ -153,7 +153,7 @@ public class Task {
       context.setInterQuery();
       StoreTableNode store = (StoreTableNode) plan;
       this.partitionType = store.getPartitionType();
-      if (partitionType == PARTITION_TYPE.RANGE) {
+      if (partitionType == PartitionType.RANGE) {
         SortNode sortNode = (SortNode) store.getSubNode();
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index 2af74bb..be8a7f1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -46,9 +46,9 @@ import tajo.engine.planner.LogicalPlanner;
 import tajo.engine.planner.PlanningContext;
 import tajo.engine.planner.global.MasterPlan;
 import tajo.engine.planner.logical.*;
+import tajo.master.ExecutionBlock;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.master.GlobalPlanner;
-import tajo.master.SubQuery;
-import tajo.master.SubQuery.PARTITION_TYPE;
 import tajo.master.TajoMaster;
 import tajo.storage.*;
 
@@ -159,11 +159,11 @@ public class TestGlobalQueryPlanner {
     MasterPlan globalPlan = planner.build(queryId,
         (LogicalRootNode) plan1);
 
-    SubQuery unit = globalPlan.getRoot();
-    assertFalse(unit.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, unit.getOutputType());
+    ExecutionBlock unit = globalPlan.getRoot();
+    assertFalse(unit.hasChildBlock());
+    assertEquals(PartitionType.LIST, unit.getPartitionType());
 
-    LogicalNode plan2 = unit.getLogicalPlan();
+    LogicalNode plan2 = unit.getPlan();
     assertEquals(ExprType.STORE, plan2.getType());
     assertEquals(ExprType.SCAN, ((StoreTableNode)plan2).getSubNode().getType());
   }
@@ -178,20 +178,20 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery next, prev;
+    ExecutionBlock next, prev;
     
     next = globalPlan.getRoot();
-    assertTrue(next.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
+    assertTrue(next.hasChildBlock());
+    assertEquals(PartitionType.LIST, next.getPartitionType());
     for (ScanNode scan : next.getScanNodes()) {
       assertTrue(scan.isLocal());
     }
     assertFalse(next.getStoreTableNode().isLocal());
-    Iterator<SubQuery> it= next.getChildIterator();
+    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
     
     prev = it.next();
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.HASH, prev.getPartitionType());
     assertTrue(prev.getStoreTableNode().isLocal());
     assertFalse(it.hasNext());
     
@@ -216,26 +216,26 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery next, prev;
+    ExecutionBlock next, prev;
     
     next = globalPlan.getRoot();
     assertEquals(ExprType.PROJECTION,
         next.getStoreTableNode().getSubNode().getType());
-    assertTrue(next.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
-    Iterator<SubQuery> it= next.getChildIterator();
+    assertTrue(next.hasChildBlock());
+    assertEquals(PartitionType.LIST, next.getPartitionType());
+    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
 
     prev = it.next();
     assertEquals(ExprType.SORT,
         prev.getStoreTableNode().getSubNode().getType());
-    assertTrue(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
-    it= prev.getChildIterator();
+    assertTrue(prev.hasChildBlock());
+    assertEquals(PartitionType.LIST, prev.getPartitionType());
+    it= prev.getChildBlocks().iterator();
     next = prev;
     
     prev = it.next();
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.RANGE, prev.getPartitionType());
     assertFalse(it.hasNext());
     
     ScanNode []scans = prev.getScanNodes();
@@ -259,59 +259,59 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery next, prev;
+    ExecutionBlock next, prev;
     
     // the second phase of the sort
     next = globalPlan.getRoot();
-    assertTrue(next.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
+    assertTrue(next.hasChildBlock());
+    assertEquals(PartitionType.LIST, next.getPartitionType());
     assertEquals(ExprType.PROJECTION, next.getStoreTableNode().getSubNode().getType());
     ScanNode []scans = next.getScanNodes();
     assertEquals(1, scans.length);
-    Iterator<SubQuery> it= next.getChildIterator();
+    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
 
     prev = it.next();
     assertEquals(ExprType.SORT, prev.getStoreTableNode().getSubNode().getType());
-    assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
+    assertEquals(PartitionType.LIST, prev.getPartitionType());
     scans = prev.getScanNodes();
     assertEquals(1, scans.length);
-    it= prev.getChildIterator();
+    it= prev.getChildBlocks().iterator();
     
     // the first phase of the sort
     prev = it.next();
     assertEquals(ExprType.SORT, prev.getStoreTableNode().getSubNode().getType());
     assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
-    assertTrue(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType());
+    assertTrue(prev.hasChildBlock());
+    assertEquals(PartitionType.RANGE, prev.getPartitionType());
     assertFalse(it.hasNext());
     scans = prev.getScanNodes();
     assertEquals(1, scans.length);
     next = prev;
-    it= next.getChildIterator();
+    it= next.getChildBlocks().iterator();
     
     // the second phase of the join
     prev = it.next();
     assertEquals(ExprType.JOIN, prev.getStoreTableNode().getSubNode().getType());
     assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
-    assertTrue(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
+    assertTrue(prev.hasChildBlock());
+    assertEquals(PartitionType.LIST, prev.getPartitionType());
     assertFalse(it.hasNext());
     scans = prev.getScanNodes();
     assertEquals(2, scans.length);
     next = prev;
-    it= next.getChildIterator();
+    it= next.getChildBlocks().iterator();
     
     // the first phase of the join
     prev = it.next();
     assertEquals(ExprType.SCAN, prev.getStoreTableNode().getSubNode().getType());
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.HASH, prev.getPartitionType());
     assertEquals(1, prev.getScanNodes().length);
     
     prev = it.next();
     assertEquals(ExprType.SCAN, prev.getStoreTableNode().getSubNode().getType());
-    assertFalse(prev.hasChildQuery());
-    assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+    assertFalse(prev.hasChildBlock());
+    assertEquals(PartitionType.HASH, prev.getPartitionType());
     assertEquals(1, prev.getScanNodes().length);
     assertFalse(it.hasNext());
   }
@@ -325,15 +325,15 @@ public class TestGlobalQueryPlanner {
     
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery unit = globalPlan.getRoot();
+    ExecutionBlock unit = globalPlan.getRoot();
     StoreTableNode store = unit.getStoreTableNode();
     assertEquals(ExprType.JOIN, store.getSubNode().getType());
-    assertTrue(unit.hasChildQuery());
+    assertTrue(unit.hasChildBlock());
     ScanNode [] scans = unit.getScanNodes();
     assertEquals(2, scans.length);
-    SubQuery prev;
+    ExecutionBlock prev;
     for (ScanNode scan : scans) {
-      prev = unit.getChildQuery(scan);
+      prev = unit.getChildBlock(scan);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.SCAN, store.getSubNode().getType());
     }
@@ -348,14 +348,14 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery unit = globalPlan.getRoot();
+    ExecutionBlock unit = globalPlan.getRoot();
     StoreTableNode store = unit.getStoreTableNode();
     assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
 
     ScanNode[] scans = unit.getScanNodes();
     assertEquals(1, scans.length);
 
-    unit = unit.getChildQuery(scans[0]);
+    unit = unit.getChildBlock(scans[0]);
     store = unit.getStoreTableNode();
     assertEquals(ExprType.UNION, store.getSubNode().getType());
     UnionNode union = (UnionNode) store.getSubNode();
@@ -367,11 +367,11 @@ public class TestGlobalQueryPlanner {
     union = (UnionNode) union.getInnerNode();
     assertEquals(ExprType.SCAN, union.getOuterNode().getType());
     assertEquals(ExprType.SCAN, union.getInnerNode().getType());
-    assertTrue(unit.hasChildQuery());
+    assertTrue(unit.hasChildBlock());
     
     String tableId = "";
     for (ScanNode scan : unit.getScanNodes()) {
-      SubQuery prev = unit.getChildQuery(scan);
+      ExecutionBlock prev = unit.getChildBlock(scan);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.GROUP_BY, store.getSubNode().getType());
       GroupbyNode groupby = (GroupbyNode) store.getSubNode();
@@ -382,7 +382,7 @@ public class TestGlobalQueryPlanner {
         assertEquals(tableId, store.getTableName());
       }
       assertEquals(1, prev.getScanNodes().length);
-      prev = prev.getChildQuery(prev.getScanNodes()[0]);
+      prev = prev.getChildBlock(prev.getScanNodes()[0]);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.GROUP_BY, store.getSubNode().getType());
       groupby = (GroupbyNode) store.getSubNode();
@@ -486,13 +486,13 @@ public class TestGlobalQueryPlanner {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
 
-    SubQuery second, first, mid;
+    ExecutionBlock second, first, mid;
     ScanNode secondScan, firstScan, midScan;
 
     second = globalPlan.getRoot();
     assertTrue(second.getScanNodes().length == 1);
 
-    first = second.getChildQuery(second.getScanNodes()[0]);
+    first = second.getChildBlock(second.getScanNodes()[0]);
 
     GroupbyNode firstGroupby, secondGroupby, midGroupby;
     secondGroupby = (GroupbyNode) second.getStoreTableNode().getSubNode();
@@ -510,10 +510,10 @@ public class TestGlobalQueryPlanner {
     midScan = mid.getScanNodes()[0];
     firstScan = first.getScanNodes()[0];
 
-    assertTrue(first.getParentQuery().equals(mid));
-    assertTrue(mid.getParentQuery().equals(second));
-    assertTrue(second.getChildQuery(secondScan).equals(mid));
-    assertTrue(mid.getChildQuery(midScan).equals(first));
+    assertTrue(first.getParentBlock().equals(mid));
+    assertTrue(mid.getParentBlock().equals(second));
+    assertTrue(second.getChildBlock(secondScan).equals(mid));
+    assertTrue(mid.getChildBlock(midScan).equals(first));
     assertEquals(first.getOutputName(), midScan.getTableId());
     assertEquals(first.getOutputSchema(), midScan.getInSchema());
     assertEquals(mid.getOutputName(), secondScan.getTableId());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index fae28f7..c0d0dd2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -43,8 +43,8 @@ import tajo.engine.planner.LogicalOptimizer;
 import tajo.engine.planner.LogicalPlanner;
 import tajo.engine.planner.PlanningContext;
 import tajo.engine.planner.logical.*;
+import tajo.master.ExecutionBlock;
 import tajo.master.GlobalPlanner;
-import tajo.master.SubQuery;
 import tajo.storage.*;
 
 import java.io.IOException;
@@ -144,7 +144,7 @@ public class TestGlobalQueryOptimizer {
         (LogicalRootNode) plan);
     globalPlan = optimizer.optimize(globalPlan);
     
-    SubQuery unit = globalPlan.getRoot();
+    ExecutionBlock unit = globalPlan.getRoot();
     StoreTableNode store = unit.getStoreTableNode();
     assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
     ProjectionNode proj = (ProjectionNode) store.getSubNode();
@@ -153,16 +153,16 @@ public class TestGlobalQueryOptimizer {
     assertEquals(ExprType.SCAN, sort.getSubNode().getType());
     ScanNode scan = (ScanNode) sort.getSubNode();
     
-    assertTrue(unit.hasChildQuery());
-    unit = unit.getChildQuery(scan);
+    assertTrue(unit.hasChildBlock());
+    unit = unit.getChildBlock(scan);
     store = unit.getStoreTableNode();
     assertEquals(ExprType.SORT, store.getSubNode().getType());
     sort = (SortNode) store.getSubNode();
     assertEquals(ExprType.JOIN, sort.getSubNode().getType());
     
-    assertTrue(unit.hasChildQuery());
+    assertTrue(unit.hasChildBlock());
     for (ScanNode prevscan : unit.getScanNodes()) {
-      SubQuery prev = unit.getChildQuery(prevscan);
+      ExecutionBlock prev = unit.getChildBlock(prevscan);
       store = prev.getStoreTableNode();
       assertEquals(ExprType.SCAN, store.getSubNode().getType());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 6b8496a..8b5dee5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -46,7 +46,7 @@ import tajo.engine.planner.logical.LogicalNode;
 import tajo.engine.planner.logical.LogicalRootNode;
 import tajo.engine.planner.logical.StoreTableNode;
 import tajo.engine.planner.logical.UnionNode;
-import tajo.master.SubQuery;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.master.TajoMaster;
 import tajo.storage.*;
 import tajo.storage.index.bst.BSTIndex;
@@ -424,7 +424,7 @@ public class TestPhysicalPlanner {
     Column key1 = new Column("score.deptName", DataType.STRING);
     Column key2 = new Column("score.class", DataType.STRING);
     StoreTableNode storeNode = new StoreTableNode("partition");
-    storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[]{key1, key2}, numPartitions);
+    storeNode.setPartitions(PartitionType.HASH, new Column[]{key1, key2}, numPartitions);
     PlannerUtil.insertNode(plan, storeNode);
     plan = LogicalOptimizer.optimize(context, plan);
 
@@ -482,7 +482,7 @@ public class TestPhysicalPlanner {
 
     int numPartitions = 1;
     StoreTableNode storeNode = new StoreTableNode("emptyset");
-    storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[] {}, numPartitions);
+    storeNode.setPartitions(PartitionType.HASH, new Column[] {}, numPartitions);
     PlannerUtil.insertNode(plan, storeNode);
     plan = LogicalOptimizer.optimize(context, plan);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
new file mode 100644
index 0000000..c6a5d43
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import tajo.QueryIdFactory;
+import tajo.TajoTestingCluster;
+import tajo.benchmark.TPCH;
+import tajo.catalog.CatalogService;
+import tajo.catalog.TCatUtil;
+import tajo.catalog.TableDesc;
+import tajo.catalog.TableMeta;
+import tajo.catalog.proto.CatalogProtos;
+import tajo.conf.TajoConf;
+import tajo.engine.parser.QueryAnalyzer;
+import tajo.engine.planner.LogicalPlanner;
+import tajo.engine.planner.PlanningContext;
+import tajo.engine.planner.global.MasterPlan;
+import tajo.engine.planner.logical.LogicalNode;
+import tajo.engine.planner.logical.LogicalRootNode;
+import tajo.storage.StorageManager;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecutionBlockCursor {
+  private static TajoTestingCluster util;
+  private static TajoConf conf;
+  private static CatalogService catalog;
+  private static GlobalPlanner planner;
+  private static QueryAnalyzer analyzer;
+  private static LogicalPlanner logicalPlanner;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+
+    conf = util.getConfiguration();
+    catalog = util.getMiniCatalogCluster().getCatalog();
+    TPCH tpch = new TPCH();
+    tpch.loadSchemas();
+    tpch.loadOutSchema();
+    for (String table : tpch.getTableNames()) {
+      TableMeta m = TCatUtil.newTableMeta(tpch.getSchema(table), CatalogProtos.StoreType.CSV);
+      TableDesc d = TCatUtil.newTableDesc(table, m, new Path("file:///"));
+      catalog.addTable(d);
+    }
+
+    analyzer = new QueryAnalyzer(catalog);
+    logicalPlanner = new LogicalPlanner(catalog);
+
+    StorageManager sm  = new StorageManager(conf);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    planner = new GlobalPlanner(conf, catalog, sm, dispatcher.getEventHandler());
+  }
+
+  public static void tearDown() {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public void testNextBlock() throws Exception {
+    PlanningContext context = analyzer.parse(
+        "select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, ps_supplycost, " +
+            "r_name, p_type, p_size " +
+            "from region join nation on n_regionkey = r_regionkey and r_name = 'AMERICA' " +
+            "join supplier on s_nationkey = n_nationkey " +
+            "join partsupp on s_suppkey = ps_suppkey " +
+            "join part on p_partkey = ps_partkey and p_type like '%BRASS' and p_size = 15");
+    LogicalNode logicalPlan = logicalPlanner.createPlan(context);
+    MasterPlan plan = planner.build(QueryIdFactory.newQueryId(), (LogicalRootNode) logicalPlan);
+
+    ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
+
+    int count = 0;
+    while(cursor.hasNext()) {
+      cursor.nextBlock();
+      count++;
+    }
+
+    // 4 input relations, 4 join, and 1 projection = 10 execution blocks
+    assertEquals(10, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
index ca3e870..2de54b3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 import tajo.QueryId;
 import tajo.SubQueryId;
 import tajo.TestQueryUnitId;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.util.TUtil;
 import tajo.util.TajoIdUtils;
 
@@ -47,7 +48,7 @@ public class TestRepartitioner {
 
     Collection<URI> uris = Repartitioner.
         createHashFetchURL(hostName + ":" + port, sid, partitionId,
-            SubQuery.PARTITION_TYPE.HASH, intermediateEntries);
+            PartitionType.HASH, intermediateEntries);
 
     List<String> taList = TUtil.newList();
     for (URI uri : uris) {


[2/2] git commit: TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)

Posted by hy...@apache.org.
TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fef3dd50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fef3dd50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fef3dd50

Branch: refs/heads/master
Commit: fef3dd509cad795d5de034253ea19e23219b9c1b
Parents: 6cf96bd
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Apr 27 10:27:29 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Apr 27 10:27:29 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../engine/planner/global/GlobalOptimizer.java     |   63 +-
 .../tajo/engine/planner/global/MasterPlan.java     |   16 +-
 .../engine/planner/logical/StoreTableNode.java     |   10 +-
 .../src/main/java/tajo/master/ExecutionBlock.java  |  181 +++++
 .../java/tajo/master/ExecutionBlockCursor.java     |   80 ++
 .../src/main/java/tajo/master/GlobalPlanner.java   |  631 +++------------
 .../src/main/java/tajo/master/Query.java           |  299 ++------
 .../src/main/java/tajo/master/QueryMaster.java     |   12 +-
 .../src/main/java/tajo/master/Repartitioner.java   |  135 ++--
 .../src/main/java/tajo/master/SchedulerUtils.java  |   47 --
 .../src/main/java/tajo/master/SubQuery.java        |  381 +++------
 .../java/tajo/master/TaskContainerManager.java     |   30 -
 .../main/java/tajo/master/TaskSchedulerImpl.java   |    3 +-
 .../java/tajo/master/cluster/WorkerListener.java   |    2 +-
 .../src/main/java/tajo/worker/Task.java            |    6 +-
 .../engine/plan/global/TestGlobalQueryPlanner.java |  104 ++--
 .../planner/global/TestGlobalQueryOptimizer.java   |   12 +-
 .../planner/physical/TestPhysicalPlanner.java      |    6 +-
 .../java/tajo/master/TestExecutionBlockCursor.java |  101 +++
 .../test/java/tajo/master/TestRepartitioner.java   |    3 +-
 21 files changed, 836 insertions(+), 1288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 787cf14..33b422c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)
+
     TAJO-32: Cleanup TaskRunner. (hyunsik)
 
     TAJO-27: Modify the document links to point the wiki's ones. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
index 6ebcaef..4c50669 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
@@ -24,11 +24,8 @@ import tajo.engine.planner.logical.ExprType;
 import tajo.engine.planner.logical.LogicalNode;
 import tajo.engine.planner.logical.ScanNode;
 import tajo.engine.planner.logical.UnaryNode;
-import tajo.master.SubQuery;
-import tajo.master.SubQuery.PARTITION_TYPE;
-
-import java.util.Collection;
-import java.util.Iterator;
+import tajo.master.ExecutionBlock;
+import tajo.master.ExecutionBlock.PartitionType;
 
 public class GlobalOptimizer {
 
@@ -37,62 +34,46 @@ public class GlobalOptimizer {
   }
   
   public MasterPlan optimize(MasterPlan plan) {
-    SubQuery reducedStep = reduceSchedules(plan.getRoot());
-    SubQuery joinChosen = chooseJoinAlgorithm(reducedStep);
+    ExecutionBlock reducedStep = reduceSchedules(plan.getRoot());
 
-    MasterPlan optimized = new MasterPlan(joinChosen);
+    MasterPlan optimized = new MasterPlan(reducedStep);
     optimized.setOutputTableName(plan.getOutputTable());
 
     return optimized;
   }
   
   @VisibleForTesting
-  private SubQuery chooseJoinAlgorithm(SubQuery logicalUnit) {
-    
-    return logicalUnit;
-  }
-  
-  @VisibleForTesting
-  private SubQuery reduceSchedules(SubQuery logicalUnit) {
+  private ExecutionBlock reduceSchedules(ExecutionBlock logicalUnit) {
     reduceLogicalQueryUnitStep_(logicalUnit);
     return logicalUnit;
   }
-  
-  private void reduceLogicalQueryUnitStep_(SubQuery cur) {
-    if (cur.hasChildQuery()) {
-      Iterator<SubQuery> it = cur.getChildIterator();
-      SubQuery prev;
-      while (it.hasNext()) {
-        prev = it.next();
-        reduceLogicalQueryUnitStep_(prev);
-      }
-      
-      Collection<SubQuery> prevs = cur.getChildQueries();
-      it = prevs.iterator();
-      while (it.hasNext()) {
-        prev = it.next();
-        if (prev.getStoreTableNode().getSubNode().getType() != ExprType.UNION &&
-            prev.getOutputType() == PARTITION_TYPE.LIST) {
-          mergeLogicalUnits(cur, prev);
-        }
+
+  private void reduceLogicalQueryUnitStep_(ExecutionBlock cur) {
+    if (cur.hasChildBlock()) {
+      for (ExecutionBlock childBlock: cur.getChildBlocks())
+        reduceLogicalQueryUnitStep_(childBlock);
+    }
+
+    for (ExecutionBlock childBlock: cur.getChildBlocks()) {
+      if (childBlock.getStoreTableNode().getSubNode().getType() != ExprType.UNION &&
+          childBlock.getPartitionType() == PartitionType.LIST) {
+        mergeLogicalUnits(cur, childBlock);
       }
     }
   }
   
-  private SubQuery mergeLogicalUnits(SubQuery parent,
-      SubQuery child) {
-    LogicalNode p = PlannerUtil.findTopParentNode(parent.getLogicalPlan(), 
-        ExprType.SCAN);
-//    Preconditions.checkArgument(p instanceof UnaryNode);
+  private ExecutionBlock mergeLogicalUnits(ExecutionBlock parent, ExecutionBlock child) {
+    LogicalNode p = PlannerUtil.findTopParentNode(parent.getPlan(), ExprType.SCAN);
+
     if (p instanceof UnaryNode) {
       UnaryNode u = (UnaryNode) p;
       ScanNode scan = (ScanNode) u.getSubNode();
       LogicalNode c = child.getStoreTableNode().getSubNode();
 
-      parent.removeChildQuery(scan);
+      parent.removeChildBlock(scan);
       u.setSubNode(c);
-      parent.setLogicalPlan(parent.getLogicalPlan());
-      parent.addChildQueries(child.getChildMaps());
+      parent.setPlan(parent.getPlan());
+      parent.addChildBlocks(child.getChildBlockMap());
     }
     return parent;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
index 3f78e5b..5ae4faf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
@@ -21,25 +21,21 @@
  */
 package tajo.engine.planner.global;
 
-import tajo.master.SubQuery;
+import tajo.master.ExecutionBlock;
 
 public class MasterPlan {
-  private SubQuery root;
+  private ExecutionBlock root;
   private String outputTableName;
-  
-  public MasterPlan() {
-    root = null;
-  }
-  
-  public MasterPlan(SubQuery root) {
+
+  public MasterPlan(ExecutionBlock root) {
     setRoot(root);
   }
   
-  public void setRoot(SubQuery root) {
+  public void setRoot(ExecutionBlock root) {
     this.root = root;
   }
   
-  public SubQuery getRoot() {
+  public ExecutionBlock getRoot() {
     return this.root;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
index e185f93..8c4d592 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import tajo.catalog.Column;
 import tajo.catalog.Options;
-import tajo.master.SubQuery;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.util.TUtil;
 
 import static tajo.catalog.proto.CatalogProtos.StoreType;
@@ -30,7 +30,7 @@ import static tajo.catalog.proto.CatalogProtos.StoreType;
 public class StoreTableNode extends UnaryNode implements Cloneable {
   @Expose private String tableName;
   @Expose private StoreType storageType = StoreType.CSV;
-  @Expose private SubQuery.PARTITION_TYPE partitionType;
+  @Expose private PartitionType partitionType;
   @Expose private int numPartitions;
   @Expose private Column [] partitionKeys;
   @Expose private boolean local;
@@ -79,12 +79,12 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   }
 
   public final void setListPartition() {
-    this.partitionType = SubQuery.PARTITION_TYPE.LIST;
+    this.partitionType = PartitionType.LIST;
     this.partitionKeys = null;
     this.numPartitions = 0;
   }
   
-  public final void setPartitions(SubQuery.PARTITION_TYPE type, Column [] keys, int numPartitions) {
+  public final void setPartitions(PartitionType type, Column [] keys, int numPartitions) {
     Preconditions.checkArgument(keys.length >= 0, 
         "At least one partition key must be specified.");
     Preconditions.checkArgument(numPartitions > 0,
@@ -95,7 +95,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     this.numPartitions = numPartitions;
   }
 
-  public SubQuery.PARTITION_TYPE getPartitionType() {
+  public PartitionType getPartitionType() {
     return this.partitionType;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
new file mode 100644
index 0000000..04d3bc7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import com.google.common.base.Preconditions;
+import tajo.SubQueryId;
+import tajo.catalog.Schema;
+import tajo.engine.planner.logical.*;
+
+import java.util.*;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
+ * An ExecutionBlock class contains input information (e.g., child execution blocks or input
+ * tables), and output information (e.g., partition type, partition key, and partition number).
+ * In addition, it includes a logical plan to be executed in each node.
+ */
+public class ExecutionBlock {
+
+  public static enum PartitionType {
+    /** for hash partitioning */
+    HASH,
+    LIST,
+    /** for map-side join */
+    BROADCAST,
+    /** for range partitioning */
+    RANGE
+  }
+
+  private SubQueryId subQueryId;
+  private LogicalNode plan = null;
+  private StoreTableNode store = null;
+  private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+  private ExecutionBlock parent;
+  private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
+  private PartitionType outputType;
+  private boolean hasJoinPlan;
+  private boolean hasUnionPlan;
+
+  public ExecutionBlock(SubQueryId subQueryId) {
+    this.subQueryId = subQueryId;
+  }
+
+  public SubQueryId getId() {
+    return subQueryId;
+  }
+
+  public String getOutputName() {
+    return store.getTableName();
+  }
+
+  public void setPartitionType(PartitionType partitionType) {
+    this.outputType = partitionType;
+  }
+
+  public PartitionType getPartitionType() {
+    return outputType;
+  }
+
+  public void setPlan(LogicalNode plan) {
+    hasJoinPlan = false;
+    Preconditions.checkArgument(plan.getType() == ExprType.STORE
+        || plan.getType() == ExprType.CREATE_INDEX);
+
+    this.plan = plan;
+    if (plan instanceof StoreTableNode) {
+      store = (StoreTableNode) plan;
+    } else {
+      store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
+    }
+
+    LogicalNode node = plan;
+    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+    s.add(node);
+    while (!s.isEmpty()) {
+      node = s.remove(s.size()-1);
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        s.add(s.size(), unary.getSubNode());
+      } else if (node instanceof BinaryNode) {
+        BinaryNode binary = (BinaryNode) node;
+        if (binary.getType() == ExprType.JOIN) {
+          hasJoinPlan = true;
+        } else if (binary.getType() == ExprType.UNION) {
+          hasUnionPlan = true;
+        }
+        s.add(s.size(), binary.getOuterNode());
+        s.add(s.size(), binary.getInnerNode());
+      } else if (node instanceof ScanNode) {
+        scanlist.add((ScanNode)node);
+      }
+    }
+  }
+
+
+  public LogicalNode getPlan() {
+    return plan;
+  }
+
+  public boolean hasParentBlock() {
+    return parent != null;
+  }
+
+  public ExecutionBlock getParentBlock() {
+    return parent;
+  }
+
+  public void setParentBlock(ExecutionBlock parent) {
+    this.parent = parent;
+  }
+
+  public boolean hasChildBlock() {
+    return childSubQueries.size() > 0;
+  }
+
+  public ExecutionBlock getChildBlock(ScanNode scanNode) {
+    return childSubQueries.get(scanNode);
+  }
+
+  public Collection<ExecutionBlock> getChildBlocks() {
+    return Collections.unmodifiableCollection(childSubQueries.values());
+  }
+
+  public Map<ScanNode, ExecutionBlock> getChildBlockMap() {
+    return childSubQueries;
+  }
+
+  public void addChildBlock(ScanNode scanNode, ExecutionBlock child) {
+    childSubQueries.put(scanNode, child);
+  }
+
+  public int getChildNum() {
+    return childSubQueries.size();
+  }
+
+  public void removeChildBlock(ScanNode scanNode) {
+    scanlist.remove(scanNode);
+    this.childSubQueries.remove(scanNode);
+  }
+
+  public void addChildBlocks(Map<ScanNode, ExecutionBlock> childBlocks) {
+    childSubQueries.putAll(childBlocks);
+  }
+
+  public boolean isLeafBlock() {
+    return childSubQueries.size() == 0;
+  }
+
+  public StoreTableNode getStoreTableNode() {
+    return store;
+  }
+
+  public ScanNode [] getScanNodes() {
+    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
+  }
+
+  public Schema getOutputSchema() {
+    return store.getOutSchema();
+  }
+
+  public boolean hasJoin() {
+    return hasJoinPlan;
+  }
+
+  public boolean hasUnion() {
+    return hasUnionPlan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
new file mode 100644
index 0000000..89e2a5b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import tajo.engine.planner.global.MasterPlan;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * This class is a pointer to an ExecutionBlock that the query engine should execute.
+ * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
+ */
+public class ExecutionBlockCursor {
+  private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
+  private int cursor = 0;
+
+  public ExecutionBlockCursor(MasterPlan plan) {
+    buildOrder(plan.getRoot());
+  }
+
+  private void buildOrder(ExecutionBlock current) {
+    if (current.hasChildBlock()) {
+      if (current.getChildNum() == 1) {
+        ExecutionBlock block = current.getChildBlocks().iterator().next();
+        buildOrder(block);
+      } else {
+        Iterator<ExecutionBlock> it = current.getChildBlocks().iterator();
+        ExecutionBlock outer = it.next();
+        ExecutionBlock inner = it.next();
+
+        // Switch between outer and inner
+        // if an inner has a child and an outer doesn't.
+        // It is for left-deep-first search.
+        if (!outer.hasChildBlock() && inner.hasChildBlock()) {
+          ExecutionBlock tmp = outer;
+          outer = inner;
+          inner = tmp;
+        }
+
+        buildOrder(outer);
+        buildOrder(inner);
+      }
+    }
+    orderedBlocks.add(current);
+  }
+
+  public boolean hasNext() {
+    return cursor < orderedBlocks.size();
+  }
+
+  public ExecutionBlock nextBlock() {
+    return orderedBlocks.get(cursor++);
+  }
+
+  public ExecutionBlock peek() {
+    return orderedBlocks.get(cursor);
+  }
+
+  public ExecutionBlock peek(int skip) {
+    return  orderedBlocks.get(cursor + skip);
+  }
+
+  public void reset() {
+    cursor = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
index 8d3b400..2709c98 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
@@ -25,9 +25,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import tajo.QueryId;
@@ -36,18 +33,14 @@ import tajo.QueryUnitAttemptId;
 import tajo.SubQueryId;
 import tajo.catalog.*;
 import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.catalog.statistics.TableStat;
 import tajo.common.exception.NotImplementedException;
 import tajo.conf.TajoConf;
-import tajo.engine.MasterWorkerProtos.Partition;
 import tajo.engine.parser.QueryBlock.FromTable;
 import tajo.engine.planner.PlannerUtil;
-import tajo.engine.planner.RangePartitionAlgorithm;
-import tajo.engine.planner.UniformRangePartition;
 import tajo.engine.planner.global.MasterPlan;
 import tajo.engine.planner.logical.*;
 import tajo.engine.utils.TupleUtil;
-import tajo.master.SubQuery.PARTITION_TYPE;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.storage.Fragment;
 import tajo.storage.StorageManager;
 import tajo.storage.TupleRange;
@@ -55,9 +48,7 @@ import tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.*;
 import java.util.Map.Entry;
 
@@ -133,20 +124,6 @@ public class GlobalPlanner {
     PlannerUtil.insertNode(parent, store);
     return store;
   }
-
-  public int getTaskNum(SubQuery subQuery) {
-    int numTasks;
-    GroupbyNode grpNode = (GroupbyNode) PlannerUtil.findTopNode(
-        subQuery.getLogicalPlan(), ExprType.GROUP_BY);
-    if (subQuery.getParentQuery() == null && grpNode != null
-        && grpNode.getGroupingColumns().length == 0) {
-      numTasks = 1;
-    } else {
-      // TODO - to be improved
-      numTasks = 32;
-    }
-    return numTasks;
-  }
   
   /**
    * Transforms a logical plan to a two-phase plan. 
@@ -316,8 +293,8 @@ public class GlobalPlanner {
     return logicalPlan;
   }
   
-  private Map<StoreTableNode, SubQuery> convertMap =
-      new HashMap<StoreTableNode, SubQuery>();
+  private Map<StoreTableNode, ExecutionBlock> convertMap =
+      new HashMap<StoreTableNode, ExecutionBlock>();
   
   /**
    * Logical plan을 후위 탐색하면서 SubQuery 생성
@@ -327,7 +304,7 @@ public class GlobalPlanner {
    */
   private void recursiveBuildSubQuery(LogicalNode node)
       throws IOException {
-    SubQuery subQuery;
+    ExecutionBlock subQuery;
     StoreTableNode store;
     if (node instanceof UnaryNode) {
       recursiveBuildSubQuery(((UnaryNode) node).getSubNode());
@@ -340,44 +317,41 @@ public class GlobalPlanner {
         } else {
           id = QueryIdFactory.newSubQueryId(queryId);
         }
-        subQuery = new SubQuery(id, sm, this);
+        subQuery = new ExecutionBlock(id);
 
         switch (store.getSubNode().getType()) {
         case BST_INDEX_SCAN:
         case SCAN:  // store - scan
           subQuery = makeScanSubQuery(subQuery);
-          subQuery.setLogicalPlan(node);
+          subQuery.setPlan(node);
           break;
         case SELECTION:
         case PROJECTION:
         case LIMIT:
           subQuery = makeUnarySubQuery(store, node, subQuery);
-          subQuery.setLogicalPlan(node);
+          subQuery.setPlan(node);
           break;
         case GROUP_BY:
           subQuery = makeGroupbySubQuery(store, node, subQuery);
-          subQuery.setLogicalPlan(node);
+          subQuery.setPlan(node);
           break;
         case SORT:
           subQuery = makeSortSubQuery(store, node, subQuery);
-          subQuery.setLogicalPlan(node);
+          subQuery.setPlan(node);
           break;
         case JOIN:  // store - join
           subQuery = makeJoinSubQuery(store, node, subQuery);
-          subQuery.setLogicalPlan(node);
+          subQuery.setPlan(node);
           break;
         case UNION:
           subQuery = makeUnionSubQuery(store, node, subQuery);
-          subQuery.setLogicalPlan(node);
+          subQuery.setPlan(node);
           break;
         default:
           subQuery = null;
           break;
         }
 
-        if (!subQuery.hasChildQuery()) {
-          subQuery.setLeafQuery();
-        }
         convertMap.put(store, subQuery);
       }
     } else if (node instanceof BinaryNode) {
@@ -390,18 +364,9 @@ public class GlobalPlanner {
     }
   }
   
-  private SubQuery makeScanSubQuery(SubQuery unit) {
-    unit.setOutputType(PARTITION_TYPE.LIST);
-    return unit;
-  }
-  
-  private SubQuery makeBSTIndexUnit(LogicalNode plan, SubQuery unit) {
-    switch(((IndexWriteNode)plan).getSubNode().getType()){
-    case SCAN:
-      unit = makeScanSubQuery(unit);
-      unit.setLogicalPlan(((IndexWriteNode)plan).getSubNode());
-    }
-    return unit;
+  private ExecutionBlock makeScanSubQuery(ExecutionBlock block) {
+    block.setPartitionType(PartitionType.LIST);
+    return block;
   }
   
   /**
@@ -413,10 +378,10 @@ public class GlobalPlanner {
    * @return
    * @throws IOException
    */
-  private SubQuery makeUnarySubQuery(StoreTableNode rootStore,
-                                     LogicalNode plan, SubQuery unit) throws IOException {
+  private ExecutionBlock makeUnarySubQuery(StoreTableNode rootStore,
+                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
     ScanNode newScan;
-    SubQuery prev;
+    ExecutionBlock prev;
     UnaryNode unary = (UnaryNode) plan;
     UnaryNode child = (UnaryNode) unary.getSubNode();
     StoreTableNode prevStore = (StoreTableNode)child.getSubNode();
@@ -429,12 +394,12 @@ public class GlobalPlanner {
     prev = convertMap.get(prevStore);
 
     if (prev != null) {
-      prev.setParentQuery(unit);
-      unit.addChildQuery(newScan, prev);
-      prev.setOutputType(PARTITION_TYPE.LIST);
+      prev.setParentBlock(unit);
+      unit.addChildBlock(newScan, prev);
+      prev.setPartitionType(PartitionType.LIST);
     }
 
-    unit.setOutputType(PARTITION_TYPE.LIST);
+    unit.setPartitionType(PartitionType.LIST);
 
     return unit;
   }
@@ -448,13 +413,13 @@ public class GlobalPlanner {
    * @return
    * @throws IOException
    */
-  private SubQuery makeGroupbySubQuery(StoreTableNode rootStore,
-                                       LogicalNode plan, SubQuery unit) throws IOException {
+  private ExecutionBlock makeGroupbySubQuery(StoreTableNode rootStore,
+                                       LogicalNode plan, ExecutionBlock unit) throws IOException {
     UnaryNode unary = (UnaryNode) plan;
     UnaryNode unaryChild;
     StoreTableNode prevStore;
     ScanNode newScan;
-    SubQuery prev;
+    ExecutionBlock prev;
     unaryChild = (UnaryNode) unary.getSubNode();  // groupby
     ExprType curType = unaryChild.getType();
     if (unaryChild.getSubNode().getType() == ExprType.STORE) {
@@ -468,30 +433,30 @@ public class GlobalPlanner {
       ((UnaryNode) unary.getSubNode()).setSubNode(newScan);
       prev = convertMap.get(prevStore);
       if (prev != null) {
-        prev.setParentQuery(unit);
-        unit.addChildQuery(newScan, prev);
+        prev.setParentBlock(unit);
+        unit.addChildBlock(newScan, prev);
       }
 
       if (unaryChild.getSubNode().getType() == curType) {
         // the second phase
-        unit.setOutputType(PARTITION_TYPE.LIST);
+        unit.setPartitionType(PartitionType.LIST);
         if (prev != null) {
-          prev.setOutputType(PARTITION_TYPE.HASH);
+          prev.setPartitionType(PartitionType.HASH);
         }
       } else {
         // the first phase
-        unit.setOutputType(PARTITION_TYPE.HASH);
+        unit.setPartitionType(PartitionType.HASH);
         if (prev != null) {
-          prev.setOutputType(PARTITION_TYPE.LIST);
+          prev.setPartitionType(PartitionType.LIST);
         }
       }
     } else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
       // the first phase
       // store - groupby - scan
-      unit.setOutputType(PARTITION_TYPE.HASH);
+      unit.setPartitionType(PartitionType.HASH);
     } else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
       _handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit, 
-          null, PARTITION_TYPE.LIST);
+          null, PartitionType.LIST);
     } else {
       // error
     }
@@ -499,21 +464,21 @@ public class GlobalPlanner {
   }
   
   /**
-   * 
-   * 
+   *
+   *
    * @param rootStore 생성할 SubQuery의 store
    * @param plan logical plan
    * @param unit 생성할 SubQuery
    * @return
    * @throws IOException
    */
-  private SubQuery makeUnionSubQuery(StoreTableNode rootStore,
-                                     LogicalNode plan, SubQuery unit) throws IOException {
+  private ExecutionBlock makeUnionSubQuery(StoreTableNode rootStore,
+                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
     UnaryNode unary = (UnaryNode) plan;
     StoreTableNode outerStore, innerStore;
-    SubQuery prev;
+    ExecutionBlock prev;
     UnionNode union = (UnionNode) unary.getSubNode();
-    unit.setOutputType(PARTITION_TYPE.LIST);
+    unit.setPartitionType(PartitionType.LIST);
     
     if (union.getOuterNode().getType() == ExprType.STORE) {
       outerStore = (StoreTableNode) union.getOuterNode();
@@ -523,13 +488,12 @@ public class GlobalPlanner {
       prev = convertMap.get(outerStore);
       if (prev != null) {
         prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setOutputType(PARTITION_TYPE.LIST);
-        prev.setParentQuery(unit);
-        prev.setLeafQuery();
-        unit.addChildQuery((ScanNode)union.getOuterNode(), prev);
+        prev.setPartitionType(PartitionType.LIST);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) union.getOuterNode(), prev);
       }
     } else if (union.getOuterNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, union, unit, null, PARTITION_TYPE.LIST);
+      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
     }
     
     if (union.getInnerNode().getType() == ExprType.STORE) {
@@ -540,26 +504,25 @@ public class GlobalPlanner {
       prev = convertMap.get(innerStore);
       if (prev != null) {
         prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setOutputType(PARTITION_TYPE.LIST);
-        prev.setParentQuery(unit);
-        prev.setLeafQuery();
-        unit.addChildQuery((ScanNode)union.getInnerNode(), prev);
+        prev.setPartitionType(PartitionType.LIST);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) union.getInnerNode(), prev);
       }
     } else if (union.getInnerNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, union, unit, null, PARTITION_TYPE.LIST);
+      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
     }
 
     return unit;
   }
 
-  private SubQuery makeSortSubQuery(StoreTableNode rootStore,
-                                    LogicalNode plan, SubQuery unit) throws IOException {
+  private ExecutionBlock makeSortSubQuery(StoreTableNode rootStore,
+                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
 
     UnaryNode unary = (UnaryNode) plan;
     UnaryNode unaryChild;
     StoreTableNode prevStore;
     ScanNode newScan;
-    SubQuery prev;
+    ExecutionBlock prev;
     unaryChild = (UnaryNode) unary.getSubNode();  // groupby
     ExprType curType = unaryChild.getType();
     if (unaryChild.getSubNode().getType() == ExprType.STORE) {
@@ -572,44 +535,44 @@ public class GlobalPlanner {
       ((UnaryNode) unary.getSubNode()).setSubNode(newScan);
       prev = convertMap.get(prevStore);
       if (prev != null) {
-        prev.setParentQuery(unit);
-        unit.addChildQuery(newScan, prev);
+        prev.setParentBlock(unit);
+        unit.addChildBlock(newScan, prev);
         if (unaryChild.getSubNode().getType() == curType) {
           // TODO - this is duplicated code
-          prev.setOutputType(PARTITION_TYPE.RANGE);
+          prev.setPartitionType(PartitionType.RANGE);
         } else {
-          prev.setOutputType(PARTITION_TYPE.LIST);
+          prev.setPartitionType(PartitionType.LIST);
         }
       }
       if (unaryChild.getSubNode().getType() == curType) {
         // the second phase
-        unit.setOutputType(PARTITION_TYPE.LIST);
+        unit.setPartitionType(PartitionType.LIST);
       } else {
         // the first phase
-        unit.setOutputType(PARTITION_TYPE.HASH);
+        unit.setPartitionType(PartitionType.HASH);
       }
     } else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
       // the first phase
       // store - sort - scan
-      unit.setOutputType(PARTITION_TYPE.RANGE);
+      unit.setPartitionType(PartitionType.RANGE);
     } else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
       _handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit,
-          null, PARTITION_TYPE.LIST);
+          null, PartitionType.LIST);
     } else {
       // error
     }
     return unit;
   }
   
-  private SubQuery makeJoinSubQuery(StoreTableNode rootStore,
-                                    LogicalNode plan, SubQuery unit) throws IOException {
+  private ExecutionBlock makeJoinSubQuery(StoreTableNode rootStore,
+                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
     UnaryNode unary = (UnaryNode)plan;
     StoreTableNode outerStore, innerStore;
-    SubQuery prev;
+    ExecutionBlock prev;
     JoinNode join = (JoinNode) unary.getSubNode();
     Schema outerSchema = join.getOuterNode().getOutSchema();
     Schema innerSchema = join.getInnerNode().getOutSchema();
-    unit.setOutputType(PARTITION_TYPE.LIST);
+    unit.setPartitionType(PartitionType.LIST);
 
     List<Column> outerCollist = new ArrayList<Column>();
     List<Column> innerCollist = new ArrayList<Column>();
@@ -639,14 +602,14 @@ public class GlobalPlanner {
       insertOuterScan(join, outerStore.getTableName(), outerMeta);
       prev = convertMap.get(outerStore);
       if (prev != null) {
-        prev.setOutputType(PARTITION_TYPE.HASH);
-        prev.setParentQuery(unit);
-        unit.addChildQuery((ScanNode)join.getOuterNode(), prev);
+        prev.setPartitionType(PartitionType.HASH);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) join.getOuterNode(), prev);
       }
-      outerStore.setPartitions(PARTITION_TYPE.HASH, outerCols, 32);
+      outerStore.setPartitions(PartitionType.HASH, outerCols, 32);
     } else if (join.getOuterNode().getType() == ExprType.UNION) {
       _handleUnionNode(rootStore, (UnionNode)join.getOuterNode(), unit, 
-          outerCols, PARTITION_TYPE.HASH);
+          outerCols, PartitionType.HASH);
     } else {
 
     }
@@ -659,14 +622,14 @@ public class GlobalPlanner {
       insertInnerScan(join, innerStore.getTableName(), innerMeta);
       prev = convertMap.get(innerStore);
       if (prev != null) {
-        prev.setOutputType(PARTITION_TYPE.HASH);
-        prev.setParentQuery(unit);
-        unit.addChildQuery((ScanNode)join.getInnerNode(), prev);
+        prev.setPartitionType(PartitionType.HASH);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) join.getInnerNode(), prev);
       }
-      innerStore.setPartitions(PARTITION_TYPE.HASH, innerCols, 32);
+      innerStore.setPartitions(PartitionType.HASH, innerCols, 32);
     } else if (join.getInnerNode().getType() == ExprType.UNION) {
       _handleUnionNode(rootStore, (UnionNode)join.getInnerNode(), unit,
-          innerCols, PARTITION_TYPE.HASH);
+          innerCols, PartitionType.HASH);
     }
     
     return unit;
@@ -683,11 +646,11 @@ public class GlobalPlanner {
    * @throws IOException
    */
   private void _handleUnionNode(StoreTableNode rootStore, UnionNode union, 
-      SubQuery cur, Column[] cols, PARTITION_TYPE prevOutputType)
+      ExecutionBlock cur, Column[] cols, PartitionType prevOutputType)
           throws IOException {
     StoreTableNode store;
     TableMeta meta;
-    SubQuery prev;
+    ExecutionBlock prev;
     
     if (union.getOuterNode().getType() == ExprType.STORE) {
       store = (StoreTableNode) union.getOuterNode();
@@ -696,12 +659,12 @@ public class GlobalPlanner {
       prev = convertMap.get(store);
       if (prev != null) {
         prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setOutputType(prevOutputType);
-        prev.setParentQuery(cur);
-        cur.addChildQuery((ScanNode)union.getOuterNode(), prev);
+        prev.setPartitionType(prevOutputType);
+        prev.setParentBlock(cur);
+        cur.addChildBlock((ScanNode) union.getOuterNode(), prev);
       }
       if (cols != null) {
-        store.setPartitions(PARTITION_TYPE.LIST, cols, 32);
+        store.setPartitions(PartitionType.LIST, cols, 32);
       }
     } else if (union.getOuterNode().getType() == ExprType.UNION) {
       _handleUnionNode(rootStore, (UnionNode)union.getOuterNode(), cur, cols, 
@@ -715,12 +678,12 @@ public class GlobalPlanner {
       prev = convertMap.get(store);
       if (prev != null) {
         prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setOutputType(prevOutputType);
-        prev.setParentQuery(cur);
-        cur.addChildQuery((ScanNode)union.getInnerNode(), prev);
+        prev.setPartitionType(prevOutputType);
+        prev.setParentBlock(cur);
+        cur.addChildBlock((ScanNode) union.getInnerNode(), prev);
       }
       if (cols != null) {
-        store.setPartitions(PARTITION_TYPE.LIST, cols, 32);
+        store.setPartitions(PartitionType.LIST, cols, 32);
       }
     } else if (union.getInnerNode().getType() == ExprType.UNION) {
       _handleUnionNode(rootStore, (UnionNode)union.getInnerNode(), cur, cols, 
@@ -729,20 +692,19 @@ public class GlobalPlanner {
   }
 
   @VisibleForTesting
-  public SubQuery createMultilevelGroupby(
-      SubQuery firstPhaseGroupby, Column[] keys)
+  public ExecutionBlock createMultilevelGroupby(
+      ExecutionBlock firstPhaseGroupby, Column[] keys)
       throws CloneNotSupportedException, IOException {
-    SubQuery secondPhaseGroupby = firstPhaseGroupby.getParentQuery();
+    ExecutionBlock secondPhaseGroupby = firstPhaseGroupby.getParentBlock();
     Preconditions.checkState(secondPhaseGroupby.getScanNodes().length == 1);
 
     ScanNode secondScan = secondPhaseGroupby.getScanNodes()[0];
     GroupbyNode secondGroupby = (GroupbyNode) secondPhaseGroupby.
         getStoreTableNode().getSubNode();
-    SubQuery newPhaseGroupby = new SubQuery(
-        QueryIdFactory.newSubQueryId(
-            firstPhaseGroupby.getId().getQueryId()), sm, this);
+    ExecutionBlock newPhaseGroupby = new ExecutionBlock(
+        QueryIdFactory.newSubQueryId(firstPhaseGroupby.getId().getQueryId()));
     LogicalNode tmp = PlannerUtil.findTopParentNode(
-        firstPhaseGroupby.getLogicalPlan(), ExprType.GROUP_BY);
+        firstPhaseGroupby.getPlan(), ExprType.GROUP_BY);
     GroupbyNode firstGroupby;
     if (tmp instanceof UnaryNode) {
       firstGroupby = (GroupbyNode) ((UnaryNode)tmp).getSubNode();
@@ -775,9 +737,9 @@ public class GlobalPlanner {
         secondGroupby.getTargets());
     newGroupby.setSubNode(newScan);
     newStore.setSubNode(newGroupby);
-    newPhaseGroupby.setLogicalPlan(newStore);
+    newPhaseGroupby.setPlan(newStore);
 
-    secondPhaseGroupby.removeChildQuery(secondScan);
+    secondPhaseGroupby.removeChildBlock(secondScan);
 
     // update the scan node of last phase
     secondScan = GlobalPlannerUtils.newScanPlan(secondScan.getInSchema(),
@@ -785,15 +747,15 @@ public class GlobalPlanner {
         sm.getTablePath(newPhaseGroupby.getOutputName()));
     secondScan.setLocal(true);
     secondGroupby.setSubNode(secondScan);
-    secondPhaseGroupby.setLogicalPlan(secondPhaseGroupby.getLogicalPlan());
+    secondPhaseGroupby.setPlan(secondPhaseGroupby.getPlan());
 
     // insert the new SubQuery
     // between the first phase and the second phase
-    secondPhaseGroupby.addChildQuery(secondScan, newPhaseGroupby);
-    newPhaseGroupby.addChildQuery(newPhaseGroupby.getScanNodes()[0],
+    secondPhaseGroupby.addChildBlock(secondScan, newPhaseGroupby);
+    newPhaseGroupby.addChildBlock(newPhaseGroupby.getScanNodes()[0],
         firstPhaseGroupby);
-    newPhaseGroupby.setParentQuery(secondPhaseGroupby);
-    firstPhaseGroupby.setParentQuery(newPhaseGroupby);
+    newPhaseGroupby.setParentBlock(secondPhaseGroupby);
+    firstPhaseGroupby.setParentBlock(newPhaseGroupby);
 
     return newPhaseGroupby;
   }
@@ -823,13 +785,13 @@ public class GlobalPlanner {
   private MasterPlan convertToGlobalPlan(IndexWriteNode index,
                                          LogicalNode logicalPlan) throws IOException {
     recursiveBuildSubQuery(logicalPlan);
-    SubQuery root;
+    ExecutionBlock root;
     
     if (index != null) {
       SubQueryId id = QueryIdFactory.newSubQueryId(queryId);
-      SubQuery unit = new SubQuery(id, sm, this);
+      ExecutionBlock unit = new ExecutionBlock(id);
       root = makeScanSubQuery(unit);
-      root.setLogicalPlan(index);
+      root.setPlan(index);
     } else {
       root = convertMap.get(((LogicalRootNode)logicalPlan).getSubNode());
       root.getStoreTableNode().setLocal(false);
@@ -837,211 +799,6 @@ public class GlobalPlanner {
     return new MasterPlan(root);
   }
   
-  public SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
-    Column[] keys = null;
-    // if the next query is join,
-    // set the partition number for the current logicalUnit
-    // TODO: the union handling is required when a join has unions as its child
-    SubQuery parentQueryUnit = subQuery.getParentQuery();
-    if (parentQueryUnit != null) {
-      if (parentQueryUnit.getStoreTableNode().getSubNode().getType() == ExprType.JOIN) {
-        subQuery.getStoreTableNode().setPartitions(subQuery.getOutputType(),
-            subQuery.getStoreTableNode().getPartitionKeys(), n);
-        keys = subQuery.getStoreTableNode().getPartitionKeys();
-      }
-    }
-
-    StoreTableNode store = subQuery.getStoreTableNode();
-    // set the partition number for group by and sort
-    if (subQuery.getOutputType() == PARTITION_TYPE.HASH) {
-      if (store.getSubNode().getType() == ExprType.GROUP_BY) {
-        GroupbyNode groupby = (GroupbyNode)store.getSubNode();
-        keys = groupby.getGroupingColumns();
-      }
-    } else if (subQuery.getOutputType() == PARTITION_TYPE.RANGE) {
-      if (store.getSubNode().getType() == ExprType.SORT) {
-        SortNode sort = (SortNode)store.getSubNode();
-        keys = new Column[sort.getSortKeys().length];
-        for (int i = 0; i < keys.length; i++) {
-          keys[i] = sort.getSortKeys()[i].getSortKey();
-        }
-      }
-    }
-    if (keys != null) {
-      if (keys.length == 0) {
-        store.setPartitions(subQuery.getOutputType(), new Column[]{}, 1);
-      } else {
-        store.setPartitions(subQuery.getOutputType(), keys, n);
-      }
-    } else {
-      store.setListPartition();
-    }
-    return subQuery;
-  }
-
-  /**
-   * 입력 받은 SubQuery를 QueryUnit들로 localize
-   * 
-   * @param subQuery localize할 SubQuery
-   * @param maxTaskNum localize된 QueryUnit의 최대 개수
-   * @return
-   * @throws IOException
-   * @throws URISyntaxException
-   */
-  public QueryUnit[] localize(SubQuery subQuery, int maxTaskNum)
-      throws IOException, URISyntaxException {
-    FileStatus[] files;
-    Fragment[] frags;
-    List<Fragment> fragList;
-    List<URI> uriList;
-    // fragments and fetches are maintained for each scan of the SubQuery
-    Map<ScanNode, List<Fragment>> fragMap = new HashMap<ScanNode, List<Fragment>>();
-    Map<ScanNode, List<URI>> fetchMap = new HashMap<ScanNode, List<URI>>();
-    
-    // set partition numbers for two phase algorithms
-    // TODO: the following line might occur a bug. see the line 623
-    subQuery = setPartitionNumberForTwoPhase(subQuery, maxTaskNum);
-
-    SortSpec [] sortSpecs = null;
-    Schema sortSchema;
-    
-    // make fetches and fragments for each scan
-    Path tablepath;
-    ScanNode[] scans = subQuery.getScanNodes();
-    for (ScanNode scan : scans) {
-      if (scan.getTableId().startsWith(QueryId.PREFIX)) {
-        tablepath = sm.getTablePath(scan.getTableId());
-      } else {
-        tablepath = catalog.getTableDesc(scan.getTableId()).getPath();
-      }
-      if (scan.isLocal()) {
-        SubQuery prev = subQuery.getChildIterator().next();
-        TableStat stat = prev.getStats();
-        if (stat.getNumRows() == 0) {
-          return new QueryUnit[0];
-        }
-        // make fetches from the previous query units
-        uriList = new ArrayList<URI>();
-        fragList = new ArrayList<Fragment>();
-
-        if (prev.getOutputType() == PARTITION_TYPE.RANGE) {
-          StoreTableNode store = (StoreTableNode) prev.getLogicalPlan();
-          SortNode sort = (SortNode) store.getSubNode();
-          sortSpecs = sort.getSortKeys();
-          sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
-
-          // calculate the number of tasks based on the data size
-          int mb = (int) Math.ceil((double)stat.getNumBytes() / 1048576);
-          LOG.info("Total size of intermediate data is approximately " + mb + " MB");
-
-          maxTaskNum = (int) Math.ceil((double)mb / 64); // determine the number of task by considering 1 task per 64MB
-          LOG.info("The desired number of tasks is set to " + maxTaskNum);
-
-          // calculate the number of maximum query ranges
-          TupleRange mergedRange =
-              TupleUtil.columnStatToRange(sort.getOutSchema(),
-                  sortSchema, stat.getColumnStats());
-          RangePartitionAlgorithm partitioner =
-              new UniformRangePartition(sortSchema, mergedRange);
-          BigDecimal card = partitioner.getTotalCardinality();
-
-          // if the number of the range cardinality is less than the desired number of tasks,
-          // we set the the number of tasks to the number of range cardinality.
-          if (card.compareTo(new BigDecimal(maxTaskNum)) < 0) {
-            LOG.info("The range cardinality (" + card
-                + ") is less then the desired number of tasks (" + maxTaskNum + ")");
-            maxTaskNum = card.intValue();
-          }
-
-          LOG.info("Try to divide " + mergedRange + " into " + maxTaskNum +
-              " sub ranges (total units: " + maxTaskNum + ")");
-          TupleRange [] ranges = partitioner.partition(maxTaskNum);
-
-          String [] queries = TupleUtil.rangesToQueries(sortSpecs, ranges);
-          for (QueryUnit qu : subQuery.getChildQuery(scan).getQueryUnits()) {
-            for (Partition p : qu.getPartitions()) {
-              for (String query : queries) {
-                uriList.add(new URI(p.getFileName() + "&" + query));
-              }
-            }
-          }
-        } else {
-          SubQuery child = subQuery.getChildQuery(scan);
-          QueryUnit[] units;
-          if (child.getStoreTableNode().getSubNode().getType() ==
-              ExprType.UNION) {
-            List<QueryUnit> list = Lists.newArrayList();
-            for (ScanNode s : child.getScanNodes()) {
-              for (QueryUnit qu : child.getChildQuery(s).getQueryUnits()) {
-                list.add(qu);
-              }
-            }
-            units = new QueryUnit[list.size()];
-            units = list.toArray(units);
-          } else {
-            units = child.getQueryUnits();
-          }
-          for (QueryUnit qu : units) {
-            for (Partition p : qu.getPartitions()) {
-              uriList.add(new URI(p.getFileName()));
-//              System.out.println("Partition: " + uriList.get(uriList.size() - 1));
-            }
-          }
-        }
-        
-        fetchMap.put(scan, uriList);
-        // one fragment is shared by query units
-        Fragment frag = new Fragment(scan.getTableId(), tablepath,
-            TCatUtil.newTableMeta(scan.getInSchema(),StoreType.CSV),
-            0, 0, null);
-        fragList.add(frag);
-        fragMap.put(scan, fragList);
-      } else {
-        fragList = new ArrayList<Fragment>();
-        // set fragments for each scan
-        if (subQuery.hasChildQuery() &&
-            (subQuery.getChildQuery(scan).getOutputType() == PARTITION_TYPE.HASH ||
-            subQuery.getChildQuery(scan).getOutputType() == PARTITION_TYPE.RANGE)
-            ) {
-          files = sm.getFileSystem().listStatus(tablepath);
-        } else {
-          files = new FileStatus[1];
-          files[0] = sm.getFileSystem().getFileStatus(tablepath);
-        }
-        for (FileStatus file : files) {
-          // make fragments
-          if (scan.isBroadcast()) {
-            frags = sm.splitBroadcastTable(file.getPath());
-          } else {
-            frags = sm.split(file.getPath());
-          }
-          for (Fragment f : frags) {
-            // TODO: the fragment ID should be set before
-            f.setId(scan.getTableId());
-            fragList.add(f);
-          }
-        }
-        fragMap.put(scan, fragList);
-      }
-    }
-
-    List<QueryUnit> unitList = null;
-    if (scans.length == 1) {
-      unitList = makeUnaryQueryUnit(subQuery, maxTaskNum, fragMap, fetchMap,
-          sortSpecs);
-    } else if (scans.length == 2) {
-      unitList = makeBinaryQueryUnit(subQuery, maxTaskNum, fragMap, fetchMap);
-    }
-    // TODO: The partition number should be set here,
-    // because the number of query units is decided above.
-
-    QueryUnit[] units = new QueryUnit[unitList.size()];
-    units = unitList.toArray(units);
-    subQuery.setQueryUnits(unitList);
-    
-    return units;
-  }
-  
   /**
    * 2개의 scan을 가진 QueryUnit들에 fragment와 fetch를 할당
    * 
@@ -1054,13 +811,14 @@ public class GlobalPlanner {
   private List<QueryUnit> makeBinaryQueryUnit(SubQuery subQuery, final int n,
       Map<ScanNode, List<Fragment>> fragMap, 
       Map<ScanNode, List<URI>> fetchMap) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    ScanNode[] scans = execBlock.getScanNodes();
     List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
     final int maxQueryUnitNum = n;
-    ScanNode[] scans = subQuery.getScanNodes();
     
-    if (subQuery.hasChildQuery()) {
-      SubQuery prev = subQuery.getChildQuery(scans[0]);
-      switch (prev.getOutputType()) {
+    if (execBlock.hasChildBlock()) {
+      ExecutionBlock prev = execBlock.getChildBlock(scans[0]);
+      switch (prev.getPartitionType()) {
       case BROADCAST:
         throw new NotImplementedException();
       case HASH:
@@ -1076,8 +834,6 @@ public class GlobalPlanner {
         throw new NotImplementedException();
       }
     } else {
-//      unitList = assignFragmentsByRoundRobin(unit, unitList, fragMap,
-//          maxQueryUnitNum);
       queryUnits = makeQueryUnitsForBinaryPlan(subQuery,
           queryUnits, fragMap);
     }
@@ -1088,12 +844,13 @@ public class GlobalPlanner {
   public List<QueryUnit> makeQueryUnitsForBinaryPlan(
       SubQuery subQuery, List<QueryUnit> queryUnits,
       Map<ScanNode, List<Fragment>> fragmentMap) {
+    ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit queryUnit;
-    if (subQuery.hasJoinPlan()) {
+    if (execBlock.hasJoin()) {
       // make query units for every composition of fragments of each scan
       Preconditions.checkArgument(fragmentMap.size()==2);
 
-      ScanNode [] scanNodes = subQuery.getScanNodes();
+      ScanNode [] scanNodes = execBlock.getScanNodes();
       String innerId = null, outerId = null;
 
       // If one relation is set to broadcast, it meaning that the relation
@@ -1148,79 +905,6 @@ public class GlobalPlanner {
 
     return queryUnits;
   }
-  
-  /**
-   * 1개의 scan을 가진 QueryUnit들에 대해 fragment와 fetch를 할당
-   * 
-   * @param subQuery
-   * @param n
-   * @param fragMap
-   * @param fetchMap
-   * @return
-   */
-  private List<QueryUnit> makeUnaryQueryUnit(SubQuery subQuery, int n,
-      Map<ScanNode, List<Fragment>> fragMap, 
-      Map<ScanNode, List<URI>> fetchMap, SortSpec[] sortSpecs) throws UnsupportedEncodingException {
-    List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-    int maxQueryUnitNum;
-    ScanNode scan = subQuery.getScanNodes()[0];
-    maxQueryUnitNum = n;
-    if (subQuery.hasChildQuery()) {
-      SubQuery child = subQuery.getChildQuery(scan);
-      if (child.getStoreTableNode().getSubNode().getType() ==
-          ExprType.GROUP_BY) {
-        GroupbyNode groupby = (GroupbyNode) child.getStoreTableNode().
-            getSubNode();
-        if (groupby.getGroupingColumns().length == 0) {
-          maxQueryUnitNum = 1;
-        }
-      }
-      switch (child.getOutputType()) {
-        case BROADCAST:
-        throw new NotImplementedException();
-
-        case HASH:
-          if (scan.isLocal()) {
-            queryUnits = assignFetchesToUnaryByHash(subQuery,
-                queryUnits, scan, fetchMap.get(scan), maxQueryUnitNum);
-            queryUnits = assignEqualFragment(queryUnits, scan,
-                fragMap.get(scan).get(0));
-          } else {
-            throw new NotImplementedException();
-          }
-          break;
-        case RANGE:
-          if (scan.isLocal()) {
-            Schema rangeSchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-            queryUnits = assignFetchesByRange(subQuery,
-                queryUnits, scan, fetchMap.get(scan),
-                maxQueryUnitNum, rangeSchema, sortSpecs[0].isAscending());
-            queryUnits = assignEqualFragment(queryUnits, scan,
-                fragMap.get(scan).get(0));
-          } else {
-            throw new NotImplementedException();
-          }
-          break;
-
-        case LIST:
-          if (scan.isLocal()) {
-            queryUnits = assignFetchesByRoundRobin(subQuery,
-                queryUnits, scan, fetchMap.get(scan), maxQueryUnitNum);
-            queryUnits = assignEqualFragment(queryUnits, scan,
-                fragMap.get(scan).get(0));
-          } else {
-            throw new NotImplementedException();
-          }
-          break;
-      }
-    } else {
-//      queryUnits = assignFragmentsByRoundRobin(subQuery, queryUnits, scan,
-//          fragMap.get(scan), maxQueryUnitNum);
-      queryUnits = makeQueryUnitsForEachFragment(subQuery,
-          queryUnits, scan, fragMap.get(scan));
-    }
-    return queryUnits;
-  }
 
   private List<QueryUnit> makeQueryUnitsForEachFragment(
       SubQuery subQuery, List<QueryUnit> queryUnits,
@@ -1235,18 +919,11 @@ public class GlobalPlanner {
   }
   
   private QueryUnit newQueryUnit(SubQuery subQuery) {
+    ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit unit = new QueryUnit(
-        QueryIdFactory.newQueryUnitId(subQuery.getId()), subQuery.isLeafQuery(),
-        subQuery.eventHandler);
-    unit.setLogicalPlan(subQuery.getLogicalPlan());
-    return unit;
-  }
-
-  private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) {
-    QueryUnit unit = new QueryUnit(
-        QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.isLeafQuery(),
+        QueryIdFactory.newQueryUnitId(subQuery.getId()), execBlock.isLeafBlock(),
         subQuery.eventHandler);
-    unit.setLogicalPlan(subQuery.getLogicalPlan());
+    unit.setLogicalPlan(execBlock.getPlan());
     return unit;
   }
   
@@ -1539,22 +1216,6 @@ public class GlobalPlanner {
   }
 
   /**
-   * Unary QueryUnit들에 broadcast partition된 fetch를 할당
-   *
-   * @param units
-   * @param scan
-   * @param uriList
-   */
-  private void assignFetchesByBroadcast(QueryUnit[] units, ScanNode scan, List<URI> uriList) {
-    for (URI uri : uriList) {
-      for (QueryUnit unit : units) {
-        // TODO: add each uri to every units
-        unit.addFetch(scan.getTableId(), uri);
-      }
-    }
-  }
-
-  /**
    * Unary QueryUnit들에 대하여 동일한 fragment를 할당
    * 
    * @param unitList
@@ -1587,80 +1248,4 @@ public class GlobalPlanner {
     }
     return unitList;
   }
-  
-  private Map<String, Map<ScanNode, List<Fragment>>> hashFragments(Map<ScanNode, 
-      List<Fragment>> fragMap) {
-    SortedMap<String, Map<ScanNode, List<Fragment>>> hashed = 
-        new TreeMap<String, Map<ScanNode,List<Fragment>>>();
-    String key;
-    Map<ScanNode, List<Fragment>> m;
-    List<Fragment> fragList;
-    for (Entry<ScanNode, List<Fragment>> e : fragMap.entrySet()) {
-      for (Fragment f : e.getValue()) {
-        key = f.getPath().getName();
-        if (hashed.containsKey(key)) {
-          m = hashed.get(key);
-        } else {
-          m = new HashMap<ScanNode, List<Fragment>>();
-        }
-        if (m.containsKey(e.getKey())) {
-          fragList = m.get(e.getKey());
-        } else {
-          fragList = new ArrayList<Fragment>();
-        }
-        fragList.add(f);
-        m.put(e.getKey(), fragList);
-        hashed.put(key, m);
-      }
-    }
-    
-    return hashed;
-  }
-  
-  private Collection<List<Fragment>> hashFragments(List<Fragment> frags) {
-    SortedMap<String, List<Fragment>> hashed = new TreeMap<String, List<Fragment>>();
-    for (Fragment f : frags) {
-      if (hashed.containsKey(f.getPath().getName())) {
-        hashed.get(f.getPath().getName()).add(f);
-      } else {
-        List<Fragment> list = new ArrayList<Fragment>();
-        list.add(f);
-        hashed.put(f.getPath().getName(), list);
-      }
-    }
-
-    return hashed.values();
-  }
-
-  public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
-    ScanNode[] scans = subQuery.getScanNodes();
-    Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
-    TableMeta meta;
-    Path inputPath;
-
-    ScanNode scan = scans[0];
-    TableDesc desc = catalog.getTableDesc(scan.getTableId());
-    inputPath = desc.getPath();
-    meta = desc.getMeta();
-
-    // TODO - should be change the inner directory
-    Path oldPath = new Path(inputPath, "data");
-    FileSystem fs = inputPath.getFileSystem(conf);
-    if (fs.exists(oldPath)) {
-      inputPath = oldPath;
-    }
-    List<Fragment> fragments = sm.getSplits(scan.getTableId(), meta, inputPath);
-
-    QueryUnit queryUnit;
-    List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-
-    int i = 0;
-    for (Fragment fragment : fragments) {
-      queryUnit = newQueryUnit(subQuery, i++);
-      queryUnit.setFragment(scan.getTableId(), fragment);
-      queryUnits.add(queryUnit);
-    }
-
-    return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
index cdd018f..292b9fd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import tajo.QueryConf;
 import tajo.QueryId;
-import tajo.QueryUnitId;
 import tajo.SubQueryId;
 import tajo.TajoProtos.QueryState;
 import tajo.catalog.TCatUtil;
@@ -65,8 +64,8 @@ public class Query implements EventHandler<QueryEvent> {
   private final EventHandler eventHandler;
   private final MasterPlan plan;
   private final StorageManager sm;
-  private PriorityQueue<SubQuery> scheduleQueue;
   private QueryContext context;
+  private ExecutionBlockCursor cursor;
 
   // Query Status
   private final QueryId id;
@@ -81,6 +80,7 @@ public class Query implements EventHandler<QueryEvent> {
   // Internal Variables
   private final Lock readLock;
   private final Lock writeLock;
+  private int priority = 100;
 
   // State Machine
   private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
@@ -115,7 +115,6 @@ public class Query implements EventHandler<QueryEvent> {
                final long appSubmitTime,
                final String queryStr,
                final EventHandler eventHandler,
-               final GlobalPlanner planner,
                final MasterPlan plan, final StorageManager sm) {
     this.context = context;
     this.conf = context.getConf();
@@ -127,13 +126,12 @@ public class Query implements EventHandler<QueryEvent> {
     this.eventHandler = eventHandler;
     this.plan = plan;
     this.sm = sm;
+    cursor = new ExecutionBlockCursor(plan);
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
 
-    this.scheduleQueue = new PriorityQueue<SubQuery>(1,new PriorityComparator());
-
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -145,10 +143,6 @@ public class Query implements EventHandler<QueryEvent> {
     return FileSystem.get(conf);
   }
 
-  protected StorageManager getStorageManager() {
-    return this.sm;
-  }
-
   public float getProgress() {
     QueryState state = getStateMachine().getCurrentState();
     if (state == QueryState.QUERY_SUCCEEDED) {
@@ -234,17 +228,6 @@ public class Query implements EventHandler<QueryEvent> {
     resultDesc = desc;
   }
 
-  class PriorityComparator implements Comparator<SubQuery> {
-    public PriorityComparator() {
-
-    }
-
-    @Override
-    public int compare(SubQuery s1, SubQuery s2) {
-      return s1.getPriority().get() - s2.getPriority().get();
-    }
-  }
-
   public MasterPlan getPlan() {
     return plan;
   }
@@ -253,36 +236,17 @@ public class Query implements EventHandler<QueryEvent> {
     return stateMachine;
   }
   
-  public void addSubQuery(SubQuery q) {
-    q.setQueryContext(context);
-    q.setEventHandler(eventHandler);
-    q.setClock(clock);
-    subqueries.put(q.getId(), q);
+  public void addSubQuery(SubQuery subquery) {
+    subqueries.put(subquery.getId(), subquery);
   }
   
   public QueryId getId() {
     return this.id;
   }
-
-  public String getQueryStr() {
-    return this.queryStr;
-  }
-
-  public Iterator<SubQuery> getSubQueryIterator() {
-    return this.subqueries.values().iterator();
-  }
   
   public SubQuery getSubQuery(SubQueryId id) {
     return this.subqueries.get(id);
   }
-  
-  public Collection<SubQuery> getSubQueries() {
-    return this.subqueries.values();
-  }
-  
-  public QueryUnit getQueryUnit(QueryUnitId id) {
-    return this.getSubQuery(id.getSubQueryId()).getQueryUnit(id);
-  }
 
   public QueryState getState() {
     readLock.lock();
@@ -293,8 +257,8 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  public int getScheduleQueueSize() {
-    return scheduleQueue.size();
+  public ExecutionBlockCursor getExecutionBlockCursor() {
+    return cursor;
   }
 
   static class InitTransition
@@ -303,136 +267,16 @@ public class Query implements EventHandler<QueryEvent> {
     @Override
     public QueryState transition(Query query, QueryEvent queryEvent) {
       query.setStartTime();
-      scheduleSubQueriesPostfix(query);
-      LOG.info("Scheduled SubQueries: " + query.getScheduleQueueSize());
-
-      return QueryState.QUERY_INIT;
-    }
-
-    private int priority = 0;
-
-    private void scheduleSubQueriesPostfix(Query query) {
-      SubQuery root = query.getPlan().getRoot();
-
-      scheduleSubQueriesPostfix_(query, root);
-
-      root.setPriority(priority);
-      query.addSubQuery(root);
-      query.schedule(root);
-    }
-    private void scheduleSubQueriesPostfix_(Query query, SubQuery current) {
-      if (current.hasChildQuery()) {
-        if (current.getChildQueries().size() == 1) {
-          SubQuery subQuery = current.getChildQueries().iterator().next();
-          scheduleSubQueriesPostfix_(query, subQuery);
-          identifySubQuery(subQuery);
-
-          query.addSubQuery(subQuery);
-          query.schedule(subQuery);
-
-          priority++;
-        } else {
-          Iterator<SubQuery> it = current.getChildQueries().iterator();
-          SubQuery outer = it.next();
-          SubQuery inner = it.next();
-
-          // Switch between outer and inner
-          // if an inner has a child and an outer doesn't.
-          // It is for left-deep-first search.
-          if (!outer.hasChildQuery() && inner.hasChildQuery()) {
-            SubQuery tmp = outer;
-            outer = inner;
-            inner = tmp;
-          }
-
-          scheduleSubQueriesPostfix_(query, outer);
-          scheduleSubQueriesPostfix_(query, inner);
-
-          identifySubQuery(outer);
-          identifySubQuery(inner);
-
-          query.addSubQuery(outer);
-          query.schedule(outer);
-          query.addSubQuery(inner);
-          query.schedule(inner);
-
-          priority++;
-        }
-      }
-    }
-
-    private void identifySubQuery(SubQuery subQuery) {
-      SubQuery parent = subQuery.getParentQuery();
-
-      if (!subQuery.hasChildQuery()) {
-
-        if (parent.getScanNodes().length == 2) {
-          Iterator<SubQuery> childIter = subQuery.getParentQuery().getChildIterator();
-
-          while (childIter.hasNext()) {
-            SubQuery another = childIter.next();
-            if (!subQuery.equals(another)) {
-              if (another.hasChildQuery()) {
-                subQuery.setPriority(++priority);
-              }
-            }
-          }
-
-          if (subQuery.getPriority() == null) {
-            subQuery.setPriority(0);
-          }
-        } else {
-          // if subQuery is leaf and not part of join.
-          if (!subQuery.hasChildQuery()) {
-            subQuery.setPriority(0);
-          }
-        }
-      } else {
-        subQuery.setPriority(priority);
-      }
-    }
-
-    private void scheduleSubQueries(Query query, SubQuery current) {
-      int priority = 0;
-
-      if (current.hasChildQuery()) {
-
-        int maxPriority = 0;
-        Iterator<SubQuery> it = current.getChildIterator();
 
-        while (it.hasNext()) {
-          SubQuery su = it.next();
-          scheduleSubQueries(query, su);
-
-          if (su.getPriority().get() > maxPriority) {
-            maxPriority = su.getPriority().get();
-          }
-        }
-
-        priority = maxPriority + 1;
-
-      } else {
-        SubQuery parent = current.getParentQuery();
-        priority = 0;
-
-        if (parent.getScanNodes().length == 2) {
-          Iterator<SubQuery> childIter = current.getParentQuery().getChildIterator();
-
-          while (childIter.hasNext()) {
-            SubQuery another = childIter.next();
-            if (!current.equals(another)) {
-              if (another.hasChildQuery()) {
-                priority = another.getPriority().get() + 1;
-              }
-            }
-          }
-        }
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      while(cursor.hasNext()) {
+        ExecutionBlock block = cursor.nextBlock();
+        System.out.println(block.getId());
+        System.out.println(block.getPlan());
+        System.out.println("--------------------------------");
       }
-
-      current.setPriority(priority);
-      // TODO
-      query.addSubQuery(current);
-      query.schedule(current);
+      query.getExecutionBlockCursor().reset();
+      return QueryState.QUERY_INIT;
     }
   }
 
@@ -441,8 +285,11 @@ public class Query implements EventHandler<QueryEvent> {
 
     @Override
     public void transition(Query query, QueryEvent queryEvent) {
-      SubQuery subQuery = query.takeSubQuery();
-      LOG.info("Schedule unit plan: \n" + subQuery.getLogicalPlan());
+      SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
+          query.sm);
+      subQuery.setPriority(query.priority--);
+      query.addSubQuery(subQuery);
+      LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
       subQuery.handle(new SubQueryEvent(subQuery.getId(),
           SubQueryEventType.SQ_INIT));
     }
@@ -453,17 +300,27 @@ public class Query implements EventHandler<QueryEvent> {
 
     @Override
     public QueryState transition(Query query, QueryEvent event) {
-
+      // increase the count for completed subqueries
       query.completedSubQueryCount++;
       SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
 
+      // if the subquery is succeeded
       if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
-        SubQuerySucceeEvent succeeEvent = (SubQuerySucceeEvent) castEvent;
+        if (cursor.hasNext()) {
+          SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
+          nextSubQuery.setPriority(query.priority--);
+          query.addSubQuery(nextSubQuery);
+          nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
+              SubQueryEventType.SQ_INIT));
+          LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority().get());
+          LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+          QueryState state = query.checkQueryForCompleted();
+          return state;
 
-        SubQuery nextSubQuery = query.takeSubQuery();
-
-        if (nextSubQuery == null) {
+        } else {
           if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
+            SubQuerySucceeEvent succeeEvent = (SubQuerySucceeEvent) castEvent;
             SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
             TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
                 succeeEvent.getTableMeta(), query.context.getOutputPath());
@@ -478,20 +335,13 @@ public class Query implements EventHandler<QueryEvent> {
             if (query.context.isCreateTableQuery()) {
               query.context.getCatalog().addTable(desc);
             }
-            return query.finished(QueryState.QUERY_SUCCEEDED);
           }
-        }
 
-        nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
-            SubQueryEventType.SQ_INIT));
-        LOG.info("Scheduling SubQuery's Priority: " + (100 - nextSubQuery.getPriority().get()));
-        LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getLogicalPlan());
-        QueryState state = query.checkQueryForCompleted();
-        return state;
-      } else if (castEvent.getFinalState() == SubQueryState.FAILED) {
-        return QueryState.QUERY_FAILED;
+          return query.finished(QueryState.QUERY_SUCCEEDED);
+        }
       } else {
-        return query.checkQueryForCompleted();
+        // if at least one subquery is failed, the query is also failed.
+        return QueryState.QUERY_FAILED;
       }
     }
   }
@@ -529,6 +379,10 @@ public class Query implements EventHandler<QueryEvent> {
     return finalState;
   }
 
+  /**
+   * Check if all subqueries of the query are completed
+   * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
+   */
   QueryState checkQueryForCompleted() {
     if (completedSubQueryCount == subqueries.size()) {
       return QueryState.QUERY_SUCCEEDED;
@@ -563,75 +417,18 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  public void schedule(SubQuery subQuery) {
-    scheduleQueue.add(subQuery);
-  }
-
-  private SubQuery takeSubQuery() {
-    SubQuery unit = removeFromScheduleQueue();
-    if (unit == null) {
-      return null;
-    }
-    List<SubQuery> pended = new ArrayList<SubQuery>();
-    Priority priority = unit.getPriority();
-
-    do {
-      if (isReady(unit)) {
-        break;
-      } else {
-        pended.add(unit);
-      }
-      unit = removeFromScheduleQueue();
-      if (unit == null) {
-        scheduleQueue.addAll(pended);
-        return null;
-      }
-    } while (priority.equals(unit.getPriority()));
-
-    if (!priority.equals(unit.getPriority())) {
-      pended.add(unit);
-      unit = null;
-    }
-    scheduleQueue.addAll(pended);
-    return unit;
-  }
-
-  private boolean isReady(SubQuery subQuery) {
-    if (subQuery.hasChildQuery()) {
-      for (SubQuery child : subQuery.getChildQueries()) {
-        if (child.getState() !=
-            SubQueryState.SUCCEEDED) {
-          return false;
-        }
-      }
-      return true;
-    } else {
-      return true;
-    }
-  }
-
-  private SubQuery removeFromScheduleQueue() {
-    if (scheduleQueue.isEmpty()) {
-      return null;
-    } else {
-      return scheduleQueue.remove();
-    }
-  }
-
-
-
   private void writeStat(Path outputPath, SubQuery subQuery, TableStat stat)
       throws IOException {
-
-    if (subQuery.getLogicalPlan().getType() == ExprType.CREATE_INDEX) {
-      IndexWriteNode index = (IndexWriteNode) subQuery.getLogicalPlan();
+    ExecutionBlock execBlock = subQuery.getBlock();
+    if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
+      IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
       Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index");
       TableMeta meta;
       if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
         meta = sm.getTableMeta(indexPath);
       } else {
         meta = TCatUtil
-            .newTableMeta(subQuery.getOutputSchema(), StoreType.CSV);
+            .newTableMeta(execBlock.getOutputSchema(), StoreType.CSV);
       }
       String indexName = IndexUtil.getIndexName(index.getTableName(),
           index.getSortSpecs());
@@ -641,7 +438,7 @@ public class Query implements EventHandler<QueryEvent> {
       sm.writeTableMeta(indexPath, meta);
 
     } else {
-      TableMeta meta = TCatUtil.newTableMeta(subQuery.getOutputSchema(),
+      TableMeta meta = TCatUtil.newTableMeta(execBlock.getOutputSchema(),
           StoreType.CSV);
       meta.setStat(stat);
       sm.writeTableMeta(outputPath, meta);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 8a6d75f..98878d3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -124,7 +124,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
 
       query = new Query(queryContext, queryId, clock, appSubmitTime,
-          "", dispatcher.getEventHandler(), null, masterPlan,
+          "", dispatcher.getEventHandler(), masterPlan,
           masterContext.getStorageManager());
       initStagingDir();
 
@@ -239,10 +239,18 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return dispatcher;
     }
 
-    public Query getQuery(){
+    public Clock getClock() {
+      return clock;
+    }
+
+    public Query getQuery() {
       return query;
     }
 
+    public SubQuery getSubQuery(SubQueryId subQueryId) {
+      return query.getSubQuery(subQueryId);
+    }
+
     public QueryId getQueryId() {
       return queryId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
index f0cbd25..7cf4422 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
@@ -23,24 +23,18 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import tajo.QueryIdFactory;
 import tajo.SubQueryId;
-import tajo.catalog.CatalogService;
-import tajo.catalog.Schema;
-import tajo.catalog.SortSpec;
-import tajo.catalog.TCatUtil;
+import tajo.catalog.*;
 import tajo.catalog.proto.CatalogProtos.StoreType;
 import tajo.catalog.statistics.TableStat;
 import tajo.conf.TajoConf.ConfVars;
 import tajo.engine.planner.PlannerUtil;
 import tajo.engine.planner.RangePartitionAlgorithm;
 import tajo.engine.planner.UniformRangePartition;
-import tajo.engine.planner.logical.GroupbyNode;
-import tajo.engine.planner.logical.ScanNode;
-import tajo.engine.planner.logical.SortNode;
-import tajo.engine.planner.logical.StoreTableNode;
+import tajo.engine.planner.logical.*;
 import tajo.engine.utils.TupleUtil;
 import tajo.exception.InternalException;
+import tajo.master.ExecutionBlock.PartitionType;
 import tajo.master.QueryUnit.IntermediateEntry;
-import tajo.master.SubQuery.PARTITION_TYPE;
 import tajo.storage.Fragment;
 import tajo.storage.TupleRange;
 import tajo.util.TUtil;
@@ -63,10 +57,10 @@ public class Repartitioner {
 
   public static QueryUnit [] createJoinTasks(SubQuery subQuery)
       throws IOException {
-
+    ExecutionBlock execBlock = subQuery.getBlock();
     CatalogService catalog = subQuery.queryContext.getCatalog();
 
-    ScanNode[] scans = subQuery.getScanNodes();
+    ScanNode[] scans = execBlock.getScanNodes();
     Path tablePath;
     Fragment [] fragments = new Fragment[2];
     TableStat [] stats = new TableStat[2];
@@ -91,7 +85,7 @@ public class Repartitioner {
       }
 
       // Getting a table stat for each scan
-      stats[i] = subQuery.getChildMaps().get(scans[i]).getStats();
+      stats[i] = subQuery.getChildQuery(scans[i]).getStats();
     }
 
     // Assigning either fragments or fetch urls to query units
@@ -100,7 +94,7 @@ public class Repartitioner {
       tasks = new QueryUnit[1];
       tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
           false, subQuery.eventHandler);
-      tasks[0].setLogicalPlan(subQuery.getLogicalPlan());
+      tasks[0].setLogicalPlan(execBlock.getPlan());
       tasks[0].setFragment(scans[0].getTableId(), fragments[0]);
       tasks[0].setFragment(scans[1].getTableId(), fragments[1]);
     } else {
@@ -182,12 +176,13 @@ public class Repartitioner {
   }
 
   private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
+    ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit [] tasks = new QueryUnit[taskNum];
     for (int i = 0; i < taskNum; i++) {
       tasks[i] = new QueryUnit(
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), i), subQuery.isLeafQuery(),
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
           subQuery.eventHandler);
-      tasks[i].setLogicalPlan(subQuery.getLogicalPlan());
+      tasks[i].setLogicalPlan(execBlock.getPlan());
       for (Fragment fragment : fragments) {
         tasks[i].setFragment2(fragment);
       }
@@ -199,7 +194,7 @@ public class Repartitioner {
   private static void addJoinPartition(QueryUnit task, SubQuery subQuery, int partitionId,
                                        Map<String, List<IntermediateEntry>> grouppedPartitions) {
 
-    for (ScanNode scanNode : subQuery.getScanNodes()) {
+    for (ScanNode scanNode : subQuery.getBlock().getScanNodes()) {
       Map<String, List<IntermediateEntry>> requests;
       if (grouppedPartitions.containsKey(scanNode.getTableId())) {
           requests = mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
@@ -210,7 +205,7 @@ public class Repartitioner {
       for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
         Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
             subQuery.getChildQuery(scanNode).getId(),
-            partitionId, PARTITION_TYPE.HASH,
+            partitionId, PartitionType.HASH,
             requestPerNode.getValue());
         fetchURIs.addAll(uris);
       }
@@ -218,37 +213,6 @@ public class Repartitioner {
     }
   }
 
-  private static QueryUnit newJoinTask(SubQuery subQuery, int partitionId,
-      Fragment [] fragments,
-      Map<String, List<IntermediateEntry>> grouppedPartitions) {
-
-    QueryUnit task = new QueryUnit(
-        QueryIdFactory.newQueryUnitId(subQuery.getId()), subQuery.isLeafQuery(),
-        subQuery.eventHandler);
-    task.setLogicalPlan(subQuery.getLogicalPlan());
-
-    Map<String, Set<URI>> fetchURIsForEachRel = new HashMap<String, Set<URI>>();
-    int i = 0;
-    for (ScanNode scanNode : subQuery.getScanNodes()) {
-      Map<String, List<IntermediateEntry>> mergedHashPartitionRequest =
-          mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
-      Set<URI> fetchURIs = TUtil.newHashSet();
-      for (Entry<String, List<IntermediateEntry>> requestPerNode
-          : mergedHashPartitionRequest.entrySet()) {
-        Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
-            subQuery.getChildQuery(scanNode).getId(),
-            partitionId, PARTITION_TYPE.HASH,
-            requestPerNode.getValue());
-        fetchURIs.addAll(uris);
-      }
-      fetchURIsForEachRel.put(scanNode.getTableId(), fetchURIs);
-      task.setFragment2(fragments[i++]);
-    }
-
-    task.setFetches(fetchURIsForEachRel);
-    return task;
-  }
-
   /**
    * This method merges the partition request associated with the pullserver's address.
    * It reduces the number of TCP connections.
@@ -274,9 +238,10 @@ public class Repartitioner {
                                                SubQuery childSubQuery,
                                                int maxNum)
       throws InternalException {
-    if (childSubQuery.getOutputType() == PARTITION_TYPE.HASH) {
+    ExecutionBlock childExecBlock = childSubQuery.getBlock();
+    if (childExecBlock.getPartitionType() == PartitionType.HASH) {
       return createHashPartitionedTasks(subQuery, childSubQuery, maxNum);
-    } else if (childSubQuery.getOutputType() == PARTITION_TYPE.RANGE) {
+    } else if (childExecBlock.getPartitionType() == PartitionType.RANGE) {
       return createRangePartitionedTasks(subQuery, childSubQuery, maxNum);
     } else {
       throw new InternalException("Cannot support partition type");
@@ -287,17 +252,17 @@ public class Repartitioner {
                                                          SubQuery childSubQuery,
                                                          int maxNum)
       throws InternalException {
-
+    ExecutionBlock execBlock = subQuery.getBlock();
     TableStat stat = childSubQuery.getStats();
     if (stat.getNumRows() == 0) {
       return new QueryUnit[0];
     }
 
-    ScanNode scan = subQuery.getScanNodes()[0];
+    ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
     tablePath = subQuery.sm.getTablePath(scan.getTableId());
 
-    StoreTableNode store = (StoreTableNode) childSubQuery.getLogicalPlan();
+    StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan();
     SortNode sort = (SortNode) store.getSubNode();
     SortSpec[] sortSpecs = sort.getSortKeys();
     Schema sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
@@ -331,7 +296,9 @@ public class Repartitioner {
 
     List<String> basicFetchURIs = new ArrayList<String>();
 
-    for (QueryUnit qu : subQuery.getChildQuery(scan).getQueryUnits()) {
+    SubQuery child = childSubQuery.queryContext.getSubQuery(
+        subQuery.getBlock().getChildBlock(scan).getId());
+    for (QueryUnit qu : child.getQueryUnits()) {
       for (IntermediateEntry p : qu.getIntermediateData()) {
         String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(),
             childSubQuery.getId(), p.taskId, p.attemptId);
@@ -404,13 +371,13 @@ public class Repartitioner {
   public static QueryUnit [] createHashPartitionedTasks(SubQuery subQuery,
                                                  SubQuery childSubQuery,
                                                  int maxNum) {
-
+    ExecutionBlock execBlock = subQuery.getBlock();
     TableStat stat = childSubQuery.getStats();
     if (stat.getNumRows() == 0) {
       return new QueryUnit[0];
     }
 
-    ScanNode scan = subQuery.getScanNodes()[0];
+    ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
     tablePath = subQuery.sm.getTablePath(scan.getTableId());
 
@@ -434,7 +401,7 @@ public class Repartitioner {
       for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
         Collection<URI> uris = createHashFetchURL(e.getKey(), childSubQuery.getId(),
             interm.getKey(),
-            childSubQuery.getStoreTableNode().getPartitionType(), e.getValue());
+            childSubQuery.getBlock().getPartitionType(), e.getValue());
 
         if (finalFetchURI.containsKey(interm.getKey())) {
           finalFetchURI.get(interm.getKey()).addAll(uris);
@@ -444,7 +411,7 @@ public class Repartitioner {
       }
     }
 
-    GroupbyNode groupby = (GroupbyNode) childSubQuery.getStoreTableNode().
+    GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getStoreTableNode().
         getSubNode();
     // the number of tasks cannot exceed the number of merged fetch uris.
     int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
@@ -471,7 +438,7 @@ public class Repartitioner {
   }
 
   public static Collection<URI> createHashFetchURL(String hostAndPort, SubQueryId childSid,
-                                       int partitionId, PARTITION_TYPE type,
+                                       int partitionId, PartitionType type,
                                        List<IntermediateEntry> entries) {
     String scheme = "http://";
     StringBuilder urlPrefix = new StringBuilder(scheme);
@@ -479,9 +446,9 @@ public class Repartitioner {
         .append("/?").append("sid=").append(childSid.getId())
         .append("&").append("p=").append(partitionId)
         .append("&").append("type=");
-    if (type == PARTITION_TYPE.HASH) {
+    if (type == PartitionType.HASH) {
       urlPrefix.append("h");
-    } else if (type == PARTITION_TYPE.RANGE) {
+    } else if (type == PartitionType.RANGE) {
       urlPrefix.append("r");
     }
     urlPrefix.append("&ta=");
@@ -542,12 +509,13 @@ public class Repartitioner {
 
   public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
                                                      Fragment frag) {
+    LogicalNode plan = subQuery.getBlock().getPlan();
     QueryUnit [] tasks = new QueryUnit[num];
     for (int i = 0; i < num; i++) {
       tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
           false, subQuery.eventHandler);
       tasks[i].setFragment2(frag);
-      tasks[i].setLogicalPlan(subQuery.getLogicalPlan());
+      tasks[i].setLogicalPlan(plan);
     }
     return tasks;
   }
@@ -568,4 +536,47 @@ public class Repartitioner {
 
     return hashed;
   }
+
+  public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    Column[] keys = null;
+    // if the next query is join,
+    // set the partition number for the current logicalUnit
+    // TODO: the union handling is required when a join has unions as its child
+    ExecutionBlock parentBlock = execBlock.getParentBlock();
+    if (parentBlock != null) {
+      if (parentBlock.getStoreTableNode().getSubNode().getType() == ExprType.JOIN) {
+        execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
+            execBlock.getStoreTableNode().getPartitionKeys(), n);
+        keys = execBlock.getStoreTableNode().getPartitionKeys();
+      }
+    }
+
+    StoreTableNode store = execBlock.getStoreTableNode();
+    // set the partition number for group by and sort
+    if (execBlock.getPartitionType() == PartitionType.HASH) {
+      if (store.getSubNode().getType() == ExprType.GROUP_BY) {
+        GroupbyNode groupby = (GroupbyNode)store.getSubNode();
+        keys = groupby.getGroupingColumns();
+      }
+    } else if (execBlock.getPartitionType() == PartitionType.RANGE) {
+      if (store.getSubNode().getType() == ExprType.SORT) {
+        SortNode sort = (SortNode)store.getSubNode();
+        keys = new Column[sort.getSortKeys().length];
+        for (int i = 0; i < keys.length; i++) {
+          keys[i] = sort.getSortKeys()[i].getSortKey();
+        }
+      }
+    }
+    if (keys != null) {
+      if (keys.length == 0) {
+        store.setPartitions(execBlock.getPartitionType(), new Column[]{}, 1);
+      } else {
+        store.setPartitions(execBlock.getPartitionType(), keys, n);
+      }
+    } else {
+      store.setListPartition();
+    }
+    return subQuery;
+  }
 }