You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/22 08:50:45 UTC

[3/4] tajo git commit: TAJO-1262: Rename the prefix 'SubQuery' to 'Stage'.

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index cf6b917..4cf6ce2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -81,11 +81,11 @@ public class Repartitioner {
   private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
   private final static String UNKNOWN_HOST = "unknown";
 
-  public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
+  public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage)
       throws IOException {
-    MasterPlan masterPlan = subQuery.getMasterPlan();
-    ExecutionBlock execBlock = subQuery.getBlock();
-    QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
+    MasterPlan masterPlan = stage.getMasterPlan();
+    ExecutionBlock execBlock = stage.getBlock();
+    QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext();
 
     ScanNode[] scans = execBlock.getScanNodes();
 
@@ -98,17 +98,17 @@ public class Repartitioner {
       TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
       if (tableDesc == null) { // if it is a real table stored on storage
         FileStorageManager storageManager =
-            (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+            (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
 
         tablePath = storageManager.getTablePath(scans[i].getTableName());
         if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) {
           for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) {
             ExecutionBlockId originScanEbId = unionScanEntry.getKey();
-            stats[i] += masterContext.getSubQuery(originScanEbId).getResultStats().getNumBytes();
+            stats[i] += masterContext.getStage(originScanEbId).getResultStats().getNumBytes();
           }
         } else {
           ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName());
-          stats[i] = masterContext.getSubQuery(scanEBId).getResultStats().getNumBytes();
+          stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes();
         }
         fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
       } else {
@@ -119,7 +119,7 @@ public class Repartitioner {
         }
 
         StorageManager storageManager =
-            StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType());
+            StorageManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType());
 
         // if table has no data, storageManager will return empty FileFragment.
         // So, we need to handle FileFragment by its size.
@@ -223,7 +223,7 @@ public class Repartitioner {
       execBlock.removeBroadcastTable(scans[baseScanIdx].getCanonicalName());
       LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join with all tables, base_table=%s, base_volume=%d",
           scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
-      scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments);
+      scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments);
     } else if (!execBlock.getBroadcastTables().isEmpty()) { // If some relations of this EB are broadcasted
       boolean hasNonLeafNode = false;
       List<Integer> largeScanIndexList = new ArrayList<Integer>();
@@ -266,7 +266,7 @@ public class Repartitioner {
         int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0);
         LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d",
             scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
-        scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments);
+        scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments);
       } else {
         if (largeScanIndexList.size() > 2) {
           throw new IOException("Symmetric Repartition Join should have two scan node, but " + nonLeafScanNames);
@@ -292,12 +292,12 @@ public class Repartitioner {
           index++;
         }
         LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, join_node=%s", nonLeafScanNames));
-        scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery,
+        scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage,
             intermediateScans, intermediateScanStats, intermediateFragments, broadcastScans, broadcastFragments);
       }
     } else {
       LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
-      scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery, scans, stats, fragments, null, null);
+      scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, scans, stats, fragments, null, null);
     }
   }
 
@@ -305,7 +305,7 @@ public class Repartitioner {
    * Scheduling in tech case of Symmetric Repartition Join
    * @param masterContext
    * @param schedulerContext
-   * @param subQuery
+   * @param stage
    * @param scans
    * @param stats
    * @param fragments
@@ -313,21 +313,21 @@ public class Repartitioner {
    */
   private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMasterTaskContext masterContext,
                                                        TaskSchedulerContext schedulerContext,
-                                                       SubQuery subQuery,
+                                                       Stage stage,
                                                        ScanNode[] scans,
                                                        long[] stats,
                                                        Fragment[] fragments,
                                                        ScanNode[] broadcastScans,
                                                        Fragment[] broadcastFragments) throws IOException {
-    MasterPlan masterPlan = subQuery.getMasterPlan();
-    ExecutionBlock execBlock = subQuery.getBlock();
+    MasterPlan masterPlan = stage.getMasterPlan();
+    ExecutionBlock execBlock = stage.getBlock();
     // The hash map is modeling as follows:
     // <Part Id, <EbId, List<Intermediate Data>>>
     Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries =
         new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
 
     // Grouping IntermediateData by a partition key and a table name
-    List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
+    List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
 
     // In the case of join with union, there is one ScanNode for union.
     Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = execBlock.getUnionScanMap();
@@ -336,7 +336,7 @@ public class Repartitioner {
       if (scanEbId == null) {
         scanEbId = childBlock.getId();
       }
-      SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
+      Stage childExecSM = stage.getContext().getStage(childBlock.getId());
 
       if (childExecSM.getHashShuffleIntermediateEntries() != null &&
           !childExecSM.getHashShuffleIntermediateEntries().isEmpty()) {
@@ -387,7 +387,7 @@ public class Repartitioner {
     // Getting the desire number of join tasks according to the volumn
     // of a larger table
     int largerIdx = stats[0] >= stats[1] ? 0 : 1;
-    int desireJoinTaskVolumn = subQuery.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE);
+    int desireJoinTaskVolumn = stage.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE);
 
     // calculate the number of tasks according to the data size
     int mb = (int) Math.ceil((double) stats[largerIdx] / 1048576);
@@ -412,7 +412,7 @@ public class Repartitioner {
         TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
         if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
           FileStorageManager storageManager =
-              (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+              (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
 
           PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
           partitionScanPaths = partitionScan.getInputPaths();
@@ -420,7 +420,7 @@ public class Repartitioner {
           getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc);
           partitionScan.setInputPaths(partitionScanPaths);
         } else {
-          StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(),
+          StorageManager storageManager = StorageManager.getStorageManager(stage.getContext().getConf(),
               tableDesc.getMeta().getStoreType());
           Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(),
               tableDesc, eachScan);
@@ -430,12 +430,12 @@ public class Repartitioner {
         }
       }
     }
-    SubQuery.scheduleFragment(subQuery, fragments[0], rightFragments);
+    Stage.scheduleFragment(stage, fragments[0], rightFragments);
 
     // Assign partitions to tasks in a round robin manner.
     for (Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry
         : hashEntries.entrySet()) {
-      addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
+      addJoinShuffle(stage, entry.getKey(), entry.getValue());
     }
 
     schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum));
@@ -503,9 +503,9 @@ public class Repartitioner {
     return fragments;
   }
 
-  private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
+  private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage,
                                                           int baseScanId, Fragment[] fragments) throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
+    ExecutionBlock execBlock = stage.getBlock();
     ScanNode[] scans = execBlock.getScanNodes();
 
     for (int i = 0; i < scans.length; i++) {
@@ -527,7 +527,7 @@ public class Repartitioner {
     List<Fragment> broadcastFragments = new ArrayList<Fragment>();
     for (int i = 0; i < scans.length; i++) {
       ScanNode scan = scans[i];
-      TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
+      TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName());
       TableMeta meta = desc.getMeta();
 
       Collection<Fragment> scanFragments;
@@ -537,11 +537,11 @@ public class Repartitioner {
         partitionScanPaths = partitionScan.getInputPaths();
         // set null to inputPaths in getFragmentsFromPartitionedTable()
         FileStorageManager storageManager =
-            (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+            (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
         scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc);
       } else {
         StorageManager storageManager =
-            StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType());
+            StorageManager.getStorageManager(stage.getContext().getConf(), desc.getMeta().getStoreType());
 
         scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan);
       }
@@ -565,14 +565,14 @@ public class Repartitioner {
       throw new IOException("No fragments for " + scans[baseScanId].getTableName());
     }
 
-    SubQuery.scheduleFragments(subQuery, baseFragments, broadcastFragments);
+    Stage.scheduleFragments(stage, baseFragments, broadcastFragments);
     schedulerContext.setEstimatedTaskNum(baseFragments.size());
   }
 
-  private static void addJoinShuffle(SubQuery subQuery, int partitionId,
+  private static void addJoinShuffle(Stage stage, int partitionId,
                                      Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) {
     Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>();
-    for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) {
+    for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) {
       if (grouppedPartitions.containsKey(execBlock.getId())) {
         Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE,
             grouppedPartitions.get(execBlock.getId()));
@@ -581,10 +581,10 @@ public class Repartitioner {
     }
 
     if (fetches.isEmpty()) {
-      LOG.info(subQuery.getId() + "'s " + partitionId + " partition has empty result.");
+      LOG.info(stage.getId() + "'s " + partitionId + " partition has empty result.");
       return;
     }
-    SubQuery.scheduleFetches(subQuery, fetches);
+    Stage.scheduleFetches(stage, fetches);
   }
 
   /**
@@ -616,14 +616,14 @@ public class Repartitioner {
   }
 
   public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
-                                                      MasterPlan masterPlan, SubQuery subQuery, int maxNum)
+                                                      MasterPlan masterPlan, Stage stage, int maxNum)
       throws IOException {
-    DataChannel channel = masterPlan.getIncomingChannels(subQuery.getBlock().getId()).get(0);
+    DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0);
     if (channel.getShuffleType() == HASH_SHUFFLE
         || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
-      scheduleHashShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
+      scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
     } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
-      scheduleRangeShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
+      scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
     } else {
       throw new InternalException("Cannot support partition type");
     }
@@ -634,22 +634,22 @@ public class Repartitioner {
     List<TableStats> tableStatses = new ArrayList<TableStats>();
     List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId);
     for (ExecutionBlock childBlock : childBlocks) {
-      SubQuery childExecSM = context.getSubQuery(childBlock.getId());
-      tableStatses.add(childExecSM.getResultStats());
+      Stage childStage = context.getStage(childBlock.getId());
+      tableStatses.add(childStage.getResultStats());
     }
     return StatisticsUtil.aggregateTableStat(tableStatses);
   }
 
   public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
-                                                  SubQuery subQuery, DataChannel channel, int maxNum)
+                                                  Stage stage, DataChannel channel, int maxNum)
       throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
+    ExecutionBlock execBlock = stage.getBlock();
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+    tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()))
         .getTablePath(scan.getTableName());
 
-    ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0);
+    ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0);
     SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
     SortSpec [] sortSpecs = sortNode.getSortKeys();
     Schema sortSchema = new Schema(channel.getShuffleKeys());
@@ -658,7 +658,7 @@ public class Repartitioner {
     int determinedTaskNum;
 
     // calculate the number of maximum query ranges
-    TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
+    TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId());
 
     // If there is an empty table in inner join, it should return zero rows.
     if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) {
@@ -668,15 +668,15 @@ public class Repartitioner {
 
     if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) {
       StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
-      CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+      CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
       LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot();
       TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
       if (tableDesc == null) {
         throw new IOException("Can't get table meta data from catalog: " +
             PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
       }
-      ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType)
-          .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc,
+      ranges = StorageManager.getStorageManager(stage.getContext().getConf(), storeType)
+          .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc,
               sortNode.getInSchema(), sortSpecs,
               mergedRange);
       determinedTaskNum = ranges.length;
@@ -687,36 +687,36 @@ public class Repartitioner {
       // if the number of the range cardinality is less than the desired number of tasks,
       // we set the the number of tasks to the number of range cardinality.
       if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
-        LOG.info(subQuery.getId() + ", The range cardinality (" + card
+        LOG.info(stage.getId() + ", The range cardinality (" + card
             + ") is less then the desired number of tasks (" + maxNum + ")");
         determinedTaskNum = card.intValue();
       } else {
         determinedTaskNum = maxNum;
       }
 
-      LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
+      LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
           " sub ranges (total units: " + determinedTaskNum + ")");
       ranges = partitioner.partition(determinedTaskNum);
       if (ranges == null || ranges.length == 0) {
-        LOG.warn(subQuery.getId() + " no range infos.");
+        LOG.warn(stage.getId() + " no range infos.");
       }
       TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
       if (LOG.isDebugEnabled()) {
         if (ranges != null) {
           for (TupleRange eachRange : ranges) {
-            LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+            LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
           }
         }
       }
     }
 
     FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
-    SubQuery.scheduleFragment(subQuery, dummyFragment);
+    Stage.scheduleFragment(stage, dummyFragment);
 
     List<FetchImpl> fetches = new ArrayList<FetchImpl>();
-    List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
+    List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
     for (ExecutionBlock childBlock : childBlocks) {
-      SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
+      Stage childExecSM = stage.getContext().getStage(childBlock.getId());
       for (Task qu : childExecSM.getTasks()) {
         for (IntermediateEntry p : qu.getIntermediateData()) {
           FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
@@ -758,12 +758,12 @@ public class Repartitioner {
       LOG.error(e);
     }
 
-    scheduleFetchesByRoundRobin(subQuery, map, scan.getTableName(), determinedTaskNum);
+    scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum);
 
     schedulerContext.setEstimatedTaskNum(determinedTaskNum);
   }
 
-  public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<FetchImpl>> partitions,
+  public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions,
                                                    String tableName, int num) {
     int i;
     Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
@@ -777,7 +777,7 @@ public class Repartitioner {
       if (i == num) i = 0;
     }
     for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
-      SubQuery.scheduleFetches(subQuery, eachFetches);
+      Stage.scheduleFetches(stage, eachFetches);
     }
   }
 
@@ -807,18 +807,18 @@ public class Repartitioner {
   }
 
   public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
-                                                 SubQuery subQuery, DataChannel channel,
+                                                 Stage stage, DataChannel channel,
                                                  int maxNum) throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
+    ExecutionBlock execBlock = stage.getBlock();
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+    tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()))
         .getTablePath(scan.getTableName());
 
     Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
     List<Fragment> fragments = new ArrayList<Fragment>();
     fragments.add(frag);
-    SubQuery.scheduleFragments(subQuery, fragments);
+    Stage.scheduleFragments(stage, fragments);
 
     Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer, FetchGroupMeta>();
     Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId,
@@ -826,7 +826,7 @@ public class Repartitioner {
 
     for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
       List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-      partitions.addAll(subQuery.getContext().getSubQuery(block.getId()).getHashShuffleIntermediateEntries());
+      partitions.addAll(stage.getContext().getStage(block.getId()).getHashShuffleIntermediateEntries());
 
       // In scattered hash shuffle, Collecting each IntermediateEntry
       if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
@@ -861,16 +861,16 @@ public class Repartitioner {
     }
 
     int groupingColumns = 0;
-    LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(),
+    LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(stage.getBlock().getPlan(),
         new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY});
     if (groupbyNodes != null && groupbyNodes.length > 0) {
       LogicalNode bottomNode = groupbyNodes[0];
       if (bottomNode.getType() == NodeType.GROUP_BY) {
         groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length;
       } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) {
-        DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+        DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
         if (distinctNode == null) {
-          LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
+          LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
           distinctNode = (DistinctGroupbyNode)bottomNode;
         }
         groupingColumns = distinctNode.getGroupingColumns().length;
@@ -879,8 +879,8 @@ public class Repartitioner {
         EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
         if (property != null) {
           if (property.getDistinct().getIsMultipleAggregation()) {
-            MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
-            if (stage != MultipleAggregationStage.THRID_STAGE) {
+            MultipleAggregationStage mulAggStage = property.getDistinct().getMultipleAggregationStage();
+            if (mulAggStage != MultipleAggregationStage.THRID_STAGE) {
               groupingColumns = distinctNode.getOutSchema().size();
             }
           }
@@ -889,13 +889,13 @@ public class Repartitioner {
     }
     // get a proper number of tasks
     int determinedTaskNum = Math.min(maxNum, finalFetches.size());
-    LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
+    LOG.info(stage.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
 
     if (groupingColumns == 0) {
       determinedTaskNum = 1;
-      LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
+      LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
     } else {
-      TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
+      TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId());
       if (totalStat.getNumRows() == 0) {
         determinedTaskNum = 1;
       }
@@ -903,13 +903,13 @@ public class Repartitioner {
 
     // set the proper number of tasks to the estimated task num
     if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
-      scheduleScatteredHashShuffleFetches(schedulerContext, subQuery, intermediates,
+      scheduleScatteredHashShuffleFetches(schedulerContext, stage, intermediates,
           scan.getTableName());
     } else {
       schedulerContext.setEstimatedTaskNum(determinedTaskNum);
       // divide fetch uris into the the proper number of tasks according to volumes
-      scheduleFetchesByEvenDistributedVolumes(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
-      LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
+      scheduleFetchesByEvenDistributedVolumes(stage, finalFetches, scan.getTableName(), determinedTaskNum);
+      LOG.info(stage.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
     }
   }
 
@@ -970,12 +970,12 @@ public class Repartitioner {
     return new Pair<Long[], Map<String, List<FetchImpl>>[]>(assignedVolumes, fetchesArray);
   }
 
-  public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Map<Integer, FetchGroupMeta> partitions,
+  public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions,
                                                              String tableName, int num) {
     Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
     // Schedule FetchImpls
     for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
-      SubQuery.scheduleFetches(subQuery, eachFetches);
+      Stage.scheduleFetches(stage, eachFetches);
     }
   }
 
@@ -987,12 +987,12 @@ public class Repartitioner {
   // to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit.
   // It is usually used for writing partitioned tables.
   public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext,
-       SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
+       Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
        String tableName) {
     long splitVolume = StorageUnit.MB *
-        subQuery.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
+        stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
     long pageSize = StorageUnit.MB * 
-        subQuery.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
+        stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
     if (pageSize >= splitVolume) {
       throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
           "tajo.shuffle.hash.appender.page.volumn-mb");
@@ -1033,11 +1033,11 @@ public class Repartitioner {
       fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
       fetchesArray[i].put(tableName, entry);
 
-      SubQuery.scheduleFetches(subQuery, fetchesArray[i]);
+      Stage.scheduleFetches(stage, fetchesArray[i]);
       i++;
     }
 
-    LOG.info(subQuery.getId()
+    LOG.info(stage.getId()
         + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()
         + ", Intermediate Size: " + totalIntermediateSize
         + ", splitSize: " + splitVolume
@@ -1207,16 +1207,16 @@ public class Repartitioner {
     return hashed;
   }
 
-  public static SubQuery setShuffleOutputNumForTwoPhase(SubQuery subQuery, final int desiredNum, DataChannel channel) {
-    ExecutionBlock execBlock = subQuery.getBlock();
+  public static Stage setShuffleOutputNumForTwoPhase(Stage stage, final int desiredNum, DataChannel channel) {
+    ExecutionBlock execBlock = stage.getBlock();
     Column[] keys;
     // if the next query is join,
     // set the partition number for the current logicalUnit
     // TODO: the union handling is required when a join has unions as its child
-    MasterPlan masterPlan = subQuery.getMasterPlan();
+    MasterPlan masterPlan = stage.getMasterPlan();
     keys = channel.getShuffleKeys();
-    if (!masterPlan.isRoot(subQuery.getBlock()) ) {
-      ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
+    if (!masterPlan.isRoot(stage.getBlock()) ) {
+      ExecutionBlock parentBlock = masterPlan.getParent(stage.getBlock());
       if (parentBlock.getPlan().getType() == NodeType.JOIN) {
         channel.setShuffleOutputNum(desiredNum);
       }
@@ -1246,6 +1246,6 @@ public class Repartitioner {
         channel.setShuffleOutputNum(desiredNum);
       }
     }
-    return subQuery;
+    return stage;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
new file mode 100644
index 0000000..e421417
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
@@ -0,0 +1,1342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
+import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
+import org.apache.tajo.master.*;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.storage.FileStorageManager;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.history.TaskHistory;
+import org.apache.tajo.util.history.StageHistory;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+
+
+/**
+ * Stage plays a role in controlling an ExecutionBlock and is a finite state machine.
+ */
+public class Stage implements EventHandler<StageEvent> {
+
+  private static final Log LOG = LogFactory.getLog(Stage.class);
+
+  private MasterPlan masterPlan;
+  private ExecutionBlock block;
+  private int priority;
+  private Schema schema;
+  private TableMeta meta;
+  private TableStats resultStatistics;
+  private TableStats inputStatistics;
+  private EventHandler<Event> eventHandler;
+  private AbstractTaskScheduler taskScheduler;
+  private QueryMasterTask.QueryMasterTaskContext context;
+  private final List<String> diagnostics = new ArrayList<String>();
+  private StageState stageState;
+
+  private long startTime;
+  private long finishTime;
+
+  volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
+  volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
+    TajoContainer>();
+
+  private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+  private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
+  private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
+  private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
+      new AllocatedContainersCancelTransition();
+  private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
+  private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
+
+  protected static final StateMachineFactory<Stage, StageState,
+      StageEventType, StageEvent> stateMachineFactory =
+      new StateMachineFactory <Stage, StageState,
+          StageEventType, StageEvent> (StageState.NEW)
+
+          // Transitions from NEW state
+          .addTransition(StageState.NEW,
+              EnumSet.of(StageState.INITED, StageState.ERROR, StageState.SUCCEEDED),
+              StageEventType.SQ_INIT,
+              new InitAndRequestContainer())
+          .addTransition(StageState.NEW, StageState.NEW,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.NEW, StageState.KILLED,
+              StageEventType.SQ_KILL)
+          .addTransition(StageState.NEW, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from INITED state
+          .addTransition(StageState.INITED, StageState.RUNNING,
+              StageEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINER_LAUNCH_TRANSITION)
+          .addTransition(StageState.INITED, StageState.INITED,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.INITED, StageState.KILL_WAIT,
+              StageEventType.SQ_KILL, new KillTasksTransition())
+          .addTransition(StageState.INITED, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from RUNNING state
+          .addTransition(StageState.RUNNING, StageState.RUNNING,
+              StageEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINER_LAUNCH_TRANSITION)
+          .addTransition(StageState.RUNNING, StageState.RUNNING,
+              StageEventType.SQ_TASK_COMPLETED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(StageState.RUNNING,
+              EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+              StageEventType.SQ_STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
+          .addTransition(StageState.RUNNING, StageState.RUNNING,
+              StageEventType.SQ_FAILED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(StageState.RUNNING, StageState.RUNNING,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.RUNNING, StageState.KILL_WAIT,
+              StageEventType.SQ_KILL,
+              new KillTasksTransition())
+          .addTransition(StageState.RUNNING, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able Transition
+          .addTransition(StageState.RUNNING, StageState.RUNNING,
+              StageEventType.SQ_START)
+
+          // Transitions from KILL_WAIT state
+          .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+              StageEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+              EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition())
+          .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+              StageEventType.SQ_TASK_COMPLETED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(StageState.KILL_WAIT,
+              EnumSet.of(StageState.SUCCEEDED, StageState.FAILED, StageState.KILLED),
+              StageEventType.SQ_STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
+          .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+              StageEventType.SQ_FAILED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(StageState.KILL_WAIT, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+              // Transitions from SUCCEEDED state
+          .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
+              StageEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.SUCCEEDED, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+              // Ignore-able events
+          .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
+              EnumSet.of(
+                  StageEventType.SQ_START,
+                  StageEventType.SQ_KILL,
+                  StageEventType.SQ_CONTAINER_ALLOCATED))
+
+          // Transitions from KILLED state
+          .addTransition(StageState.KILLED, StageState.KILLED,
+              StageEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(StageState.KILLED, StageState.KILLED,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.KILLED, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+              // Ignore-able transitions
+          .addTransition(StageState.KILLED, StageState.KILLED,
+              EnumSet.of(
+                  StageEventType.SQ_START,
+                  StageEventType.SQ_KILL,
+                  StageEventType.SQ_CONTAINER_ALLOCATED,
+                  StageEventType.SQ_FAILED))
+
+          // Transitions from FAILED state
+          .addTransition(StageState.FAILED, StageState.FAILED,
+              StageEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(StageState.FAILED, StageState.FAILED,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.FAILED, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(StageState.FAILED, StageState.FAILED,
+              EnumSet.of(
+                  StageEventType.SQ_START,
+                  StageEventType.SQ_KILL,
+                  StageEventType.SQ_CONTAINER_ALLOCATED,
+                  StageEventType.SQ_FAILED))
+
+          // Transitions from ERROR state
+          .addTransition(StageState.ERROR, StageState.ERROR,
+              StageEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(StageState.ERROR, StageState.ERROR,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(StageState.ERROR, StageState.ERROR,
+              EnumSet.of(
+                  StageEventType.SQ_START,
+                  StageEventType.SQ_KILL,
+                  StageEventType.SQ_FAILED,
+                  StageEventType.SQ_INTERNAL_ERROR,
+                  StageEventType.SQ_STAGE_COMPLETED))
+
+          .installTopology();
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private int totalScheduledObjectsCount;
+  private int succeededObjectCount = 0;
+  private int completedTaskCount = 0;
+  private int succeededTaskCount = 0;
+  private int killedObjectCount = 0;
+  private int failedObjectCount = 0;
+  private TaskSchedulerContext schedulerContext;
+  private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
+  private AtomicInteger completeReportReceived = new AtomicInteger(0);
+  private StageHistory finalStageHistory;
+
+  public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
+    this.context = context;
+    this.masterPlan = masterPlan;
+    this.block = block;
+    this.eventHandler = context.getEventHandler();
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+    stateMachine = stateMachineFactory.make(this);
+    stageState = stateMachine.getCurrentState();
+  }
+
+  public static boolean isRunningState(StageState state) {
+    return state == StageState.INITED || state == StageState.NEW || state == StageState.RUNNING;
+  }
+
+  public QueryMasterTask.QueryMasterTaskContext getContext() {
+    return context;
+  }
+
+  public MasterPlan getMasterPlan() {
+    return masterPlan;
+  }
+
+  public DataChannel getDataChannel() {
+    return masterPlan.getOutgoingChannels(getId()).iterator().next();
+  }
+
+  public EventHandler<Event> getEventHandler() {
+    return eventHandler;
+  }
+
+  public AbstractTaskScheduler getTaskScheduler() {
+    return taskScheduler;
+  }
+
+  public void setStartTime() {
+    startTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public void setFinishTime() {
+    finishTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public float getTaskProgress() {
+    readLock.lock();
+    try {
+      if (getState() == StageState.NEW) {
+        return 0;
+      } else {
+        return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public float getProgress() {
+    List<Task> tempTasks = null;
+    readLock.lock();
+    try {
+      if (getState() == StageState.NEW) {
+        return 0.0f;
+      } else {
+        tempTasks = new ArrayList<Task>(tasks.values());
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    float totalProgress = 0.0f;
+    for (Task eachTask : tempTasks) {
+      if (eachTask.getLastAttempt() != null) {
+        totalProgress += eachTask.getLastAttempt().getProgress();
+      }
+    }
+
+    if (totalProgress > 0.0f) {
+      return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f;
+    } else {
+      return 0.0f;
+    }
+  }
+
+  public int getSucceededObjectCount() {
+    return succeededObjectCount;
+  }
+
+  public int getTotalScheduledObjectsCount() {
+    return totalScheduledObjectsCount;
+  }
+
+  public ExecutionBlock getBlock() {
+    return block;
+  }
+
+  public void addTask(Task task) {
+    tasks.put(task.getId(), task);
+  }
+
+  public StageHistory getStageHistory() {
+    if (finalStageHistory != null) {
+      if (finalStageHistory.getFinishTime() == 0) {
+        finalStageHistory = makeStageHistory();
+        finalStageHistory.setTasks(makeTaskHistories());
+      }
+      return finalStageHistory;
+    } else {
+      return makeStageHistory();
+    }
+  }
+
+  private List<TaskHistory> makeTaskHistories() {
+    List<TaskHistory> taskHistories = new ArrayList<TaskHistory>();
+
+    for(Task eachTask : getTasks()) {
+      taskHistories.add(eachTask.getTaskHistory());
+    }
+
+    return taskHistories;
+  }
+
+  private StageHistory makeStageHistory() {
+    StageHistory stageHistory = new StageHistory();
+
+    stageHistory.setExecutionBlockId(getId().toString());
+    stageHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan()));
+    stageHistory.setState(getState().toString());
+    stageHistory.setStartTime(startTime);
+    stageHistory.setFinishTime(finishTime);
+    stageHistory.setSucceededObjectCount(succeededObjectCount);
+    stageHistory.setKilledObjectCount(killedObjectCount);
+    stageHistory.setFailedObjectCount(failedObjectCount);
+    stageHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount);
+    stageHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned());
+    stageHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned());
+
+    long totalInputBytes = 0;
+    long totalReadBytes = 0;
+    long totalReadRows = 0;
+    long totalWriteBytes = 0;
+    long totalWriteRows = 0;
+    int numShuffles = 0;
+    for(Task eachTask : getTasks()) {
+      numShuffles = eachTask.getShuffleOutpuNum();
+      if (eachTask.getLastAttempt() != null) {
+        TableStats inputStats = eachTask.getLastAttempt().getInputStats();
+        if (inputStats != null) {
+          totalInputBytes += inputStats.getNumBytes();
+          totalReadBytes += inputStats.getReadBytes();
+          totalReadRows += inputStats.getNumRows();
+        }
+        TableStats outputStats = eachTask.getLastAttempt().getResultStats();
+        if (outputStats != null) {
+          totalWriteBytes += outputStats.getNumBytes();
+          totalWriteRows += outputStats.getNumRows();
+        }
+      }
+    }
+
+    stageHistory.setTotalInputBytes(totalInputBytes);
+    stageHistory.setTotalReadBytes(totalReadBytes);
+    stageHistory.setTotalReadRows(totalReadRows);
+    stageHistory.setTotalWriteBytes(totalWriteBytes);
+    stageHistory.setTotalWriteRows(totalWriteRows);
+    stageHistory.setNumShuffles(numShuffles);
+    stageHistory.setProgress(getProgress());
+    return stageHistory;
+  }
+
+  /**
+   * It finalizes this stage. It is only invoked when the stage is succeeded.
+   */
+  public void complete() {
+    cleanup();
+    finalizeStats();
+    setFinishTime();
+    eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
+  }
+
+  /**
+   * It finalizes this stage. Unlike {@link Stage#complete()},
+   * it is invoked when a stage is abnormally finished.
+   *
+   * @param finalState The final stage state
+   */
+  public void abort(StageState finalState) {
+    // TODO -
+    // - committer.abortStage(...)
+    // - record Stage Finish Time
+    // - CleanUp Tasks
+    // - Record History
+    cleanup();
+    setFinishTime();
+    eventHandler.handle(new StageCompletedEvent(getId(), finalState));
+  }
+
+  public StateMachine<StageState, StageEventType, StageEvent> getStateMachine() {
+    return this.stateMachine;
+  }
+
+  public void setPriority(int priority) {
+    this.priority = priority;
+  }
+
+
+  public int getPriority() {
+    return this.priority;
+  }
+
+  public ExecutionBlockId getId() {
+    return block.getId();
+  }
+  
+  public Task[] getTasks() {
+    return tasks.values().toArray(new Task[tasks.size()]);
+  }
+  
+  public Task getTask(TaskId qid) {
+    return tasks.get(qid);
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public TableMeta getTableMeta() {
+    return meta;
+  }
+
+  public TableStats getResultStats() {
+    return resultStatistics;
+  }
+
+  public TableStats getInputStats() {
+    return inputStatistics;
+  }
+
+  public List<String> getDiagnostics() {
+    readLock.lock();
+    try {
+      return diagnostics;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected void addDiagnostic(String diag) {
+    diagnostics.add(diag);
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.getId());
+    return sb.toString();
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Stage) {
+      Stage other = (Stage)o;
+      return getId().equals(other.getId());
+    }
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    return getId().hashCode();
+  }
+  
+  public int compareTo(Stage other) {
+    return getId().compareTo(other.getId());
+  }
+
+  public StageState getSynchronizedState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /* non-blocking call for client API */
+  public StageState getState() {
+    return stageState;
+  }
+
+  public static TableStats[] computeStatFromUnionBlock(Stage stage) {
+    TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
+    long[] avgRows = new long[]{0, 0};
+    long[] numBytes = new long[]{0, 0};
+    long[] readBytes = new long[]{0, 0};
+    long[] numRows = new long[]{0, 0};
+    int[] numBlocks = new int[]{0, 0};
+    int[] numOutputs = new int[]{0, 0};
+
+    List<ColumnStats> columnStatses = Lists.newArrayList();
+
+    MasterPlan masterPlan = stage.getMasterPlan();
+    Iterator<ExecutionBlock> it = masterPlan.getChilds(stage.getBlock()).iterator();
+    while (it.hasNext()) {
+      ExecutionBlock block = it.next();
+      Stage childStage = stage.context.getStage(block.getId());
+      TableStats[] childStatArray = new TableStats[]{
+          childStage.getInputStats(), childStage.getResultStats()
+      };
+      for (int i = 0; i < 2; i++) {
+        if (childStatArray[i] == null) {
+          continue;
+        }
+        avgRows[i] += childStatArray[i].getAvgRows();
+        numBlocks[i] += childStatArray[i].getNumBlocks();
+        numBytes[i] += childStatArray[i].getNumBytes();
+        readBytes[i] += childStatArray[i].getReadBytes();
+        numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
+        numRows[i] += childStatArray[i].getNumRows();
+      }
+      columnStatses.addAll(childStatArray[1].getColumnStats());
+    }
+
+    for (int i = 0; i < 2; i++) {
+      stat[i].setNumBlocks(numBlocks[i]);
+      stat[i].setNumBytes(numBytes[i]);
+      stat[i].setReadBytes(readBytes[i]);
+      stat[i].setNumShuffleOutputs(numOutputs[i]);
+      stat[i].setNumRows(numRows[i]);
+      stat[i].setAvgRows(avgRows[i]);
+    }
+    stat[1].setColumnStats(columnStatses);
+
+    return stat;
+  }
+
+  private TableStats[] computeStatFromTasks() {
+    List<TableStats> inputStatsList = Lists.newArrayList();
+    List<TableStats> resultStatsList = Lists.newArrayList();
+    for (Task unit : getTasks()) {
+      resultStatsList.add(unit.getStats());
+      if (unit.getLastAttempt().getInputStats() != null) {
+        inputStatsList.add(unit.getLastAttempt().getInputStats());
+      }
+    }
+    TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
+    TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
+    return new TableStats[]{inputStats, resultStats};
+  }
+
+  private void stopScheduler() {
+    // If there are launched TaskRunners, send the 'shouldDie' message to all r
+    // via received task requests.
+    if (taskScheduler != null) {
+      taskScheduler.stop();
+    }
+  }
+
+  private void releaseContainers() {
+    // If there are still live TaskRunners, try to kill the containers.
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
+  }
+
+  /**
+   * It computes all stats and sets the intermediate result.
+   */
+  private void finalizeStats() {
+    TableStats[] statsArray;
+    if (block.hasUnion()) {
+      statsArray = computeStatFromUnionBlock(this);
+    } else {
+      statsArray = computeStatFromTasks();
+    }
+
+    DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
+
+    // if store plan (i.e., CREATE or INSERT OVERWRITE)
+    StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+    if (storeType == null) {
+      // get default or store type
+      storeType = StoreType.CSV;
+    }
+
+    schema = channel.getSchema();
+    meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet());
+    inputStatistics = statsArray[0];
+    resultStatistics = statsArray[1];
+  }
+
+  @Override
+  public void handle(StageEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
+          + getSynchronizedState());
+    }
+
+    try {
+      writeLock.lock();
+      StageState oldState = getSynchronizedState();
+      try {
+        getStateMachine().doTransition(event.getType(), event);
+        stageState = getSynchronizedState();
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getSynchronizedState().name()
+            , e);
+        eventHandler.handle(new StageEvent(getId(),
+            StageEventType.SQ_INTERNAL_ERROR));
+      }
+
+      // notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getSynchronizedState()) {
+          LOG.debug(getId() + " Stage Transitioned from " + oldState + " to "
+              + getSynchronizedState());
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    taskScheduler.handleTaskRequestEvent(event);
+  }
+
+  private static class InitAndRequestContainer implements MultipleArcTransition<Stage,
+      StageEvent, StageState> {
+
+    @Override
+    public StageState transition(final Stage stage, StageEvent stageEvent) {
+      stage.setStartTime();
+      ExecutionBlock execBlock = stage.getBlock();
+      StageState state;
+
+      try {
+        // Union operator does not require actual query processing. It is performed logically.
+        if (execBlock.hasUnion()) {
+          stage.finalizeStats();
+          state = StageState.SUCCEEDED;
+        } else {
+          // execute pre-processing asyncronously
+          stage.getContext().getQueryMasterContext().getEventExecutor()
+              .submit(new Runnable() {
+                        @Override
+                        public void run() {
+                          try {
+                            ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
+                            DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
+                            setShuffleIfNecessary(stage, channel);
+                            initTaskScheduler(stage);
+                            schedule(stage);
+                            stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum();
+                            LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
+
+                            if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+                              stage.complete();
+                            } else {
+                              if(stage.getSynchronizedState() == StageState.INITED) {
+                                stage.taskScheduler.start();
+                                allocateContainers(stage);
+                              } else {
+                                stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+                              }
+                            }
+                          } catch (Throwable e) {
+                            LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
+                            stage.setFinishTime();
+                            stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
+                            stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
+                          }
+                        }
+                      }
+              );
+          state = StageState.INITED;
+        }
+      } catch (Throwable e) {
+        LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
+        stage.setFinishTime();
+        stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
+        stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
+        return StageState.ERROR;
+      }
+
+      return state;
+    }
+
+    private void initTaskScheduler(Stage stage) throws IOException {
+      TajoConf conf = stage.context.getConf();
+      stage.schedulerContext = new TaskSchedulerContext(stage.context,
+          stage.getMasterPlan().isLeaf(stage.getId()), stage.getId());
+      stage.taskScheduler = TaskSchedulerFactory.get(conf, stage.schedulerContext, stage);
+      stage.taskScheduler.init(conf);
+      LOG.info(stage.taskScheduler.getName() + " is chosen for the task scheduling for " + stage.getId());
+    }
+
+    /**
+     * If a parent block requires a repartition operation, the method sets proper repartition
+     * methods and the number of partitions to a given Stage.
+     */
+    private static void setShuffleIfNecessary(Stage stage, DataChannel channel) {
+      if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
+        int numTasks = calculateShuffleOutputNum(stage, channel);
+        Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel);
+      }
+    }
+
+    /**
+     * Getting the total memory of cluster
+     *
+     * @param stage
+     * @return mega bytes
+     */
+    private static int getClusterTotalMemory(Stage stage) {
+      List<TajoMasterProtocol.WorkerResourceProto> workers =
+          stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
+
+      int totalMem = 0;
+      for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+        totalMem += worker.getMemoryMB();
+      }
+      return totalMem;
+    }
+    /**
+     * Getting the desire number of partitions according to the volume of input data.
+     * This method is only used to determine the partition key number of hash join or aggregation.
+     *
+     * @param stage
+     * @return
+     */
+    public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) {
+      TajoConf conf = stage.context.getConf();
+      MasterPlan masterPlan = stage.getMasterPlan();
+      ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
+
+      LogicalNode grpNode = null;
+      if (parent != null) {
+        grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
+        if (grpNode == null) {
+          grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
+        }
+      }
+
+      // We assume this execution block the first stage of join if two or more tables are included in this block,
+      if (parent != null && parent.getScanNodes().length >= 2) {
+        List<ExecutionBlock> childs = masterPlan.getChilds(parent);
+
+        // for outer
+        ExecutionBlock outer = childs.get(0);
+        long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer);
+
+        // for inner
+        ExecutionBlock inner = childs.get(1);
+        long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner);
+        LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+            + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
+
+        long bigger = Math.max(outerVolume, innerVolume);
+
+        int mb = (int) Math.ceil((double) bigger / 1048576);
+        LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
+
+        int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
+
+        if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
+          taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
+          LOG.warn("!!!!! TESTCASE MODE !!!!!");
+        }
+
+        // The shuffle output numbers of join may be inconsistent by execution block order.
+        // Thus, we need to compare the number with DataChannel output numbers.
+        // If the number is right, the number and DataChannel output numbers will be consistent.
+        int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0;
+        for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
+          outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+        }
+        for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
+          innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+        }
+        if (outerShuffleOutputNum != innerShuffleOutputNum
+            && taskNum != outerShuffleOutputNum
+            && taskNum != innerShuffleOutputNum) {
+          LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" +
+                  ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) +
+                  ", outerShuffleOutptNum=" + outerShuffleOutputNum +
+                  ", innerShuffleOutputNum=" + innerShuffleOutputNum);
+          taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum);
+        }
+
+        LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum);
+
+        return taskNum;
+        // Is this stage the first step of group-by?
+      } else if (grpNode != null) {
+        boolean hasGroupColumns = true;
+        if (grpNode.getType() == NodeType.GROUP_BY) {
+          hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
+        } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+          // Find current distinct stage node.
+          DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+          if (distinctNode == null) {
+            LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
+            distinctNode = (DistinctGroupbyNode)grpNode;
+          }
+          hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
+
+          Enforcer enforcer = stage.getBlock().getEnforcer();
+          if (enforcer == null) {
+            LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null.");
+          }
+          EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
+          if (property != null) {
+            if (property.getDistinct().getIsMultipleAggregation()) {
+              MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage();
+              if (multiAggStage != MultipleAggregationStage.THRID_STAGE) {
+                hasGroupColumns = true;
+              }
+            }
+          }
+        }
+        if (!hasGroupColumns) {
+          LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
+          return 1;
+        } else {
+          long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
+
+          int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB);
+          LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
+          // determine the number of task
+          int taskNum = (int) Math.ceil((double) volumeByMB /
+              masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
+          LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum);
+          return taskNum;
+        }
+      } else {
+        LOG.info("============>>>>> Unexpected Case! <<<<<================");
+        long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
+
+        int mb = (int) Math.ceil((double)volume / 1048576);
+        LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
+        // determine the number of task per 128MB
+        int taskNum = (int) Math.ceil((double)mb / 128);
+        LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum);
+        return taskNum;
+      }
+    }
+
+    private static void schedule(Stage stage) throws IOException {
+      MasterPlan masterPlan = stage.getMasterPlan();
+      ExecutionBlock execBlock = stage.getBlock();
+      if (stage.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+        scheduleFragmentsForLeafQuery(stage);
+      } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+        Repartitioner.scheduleFragmentsForJoinQuery(stage.schedulerContext, stage);
+      } else { // Case 3: Others (Sort or Aggregation)
+        int numTasks = getNonLeafTaskNum(stage);
+        Repartitioner.scheduleFragmentsForNonLeafTasks(stage.schedulerContext, masterPlan, stage, numTasks);
+      }
+    }
+
+    /**
+     * Getting the desire number of tasks according to the volume of input data
+     *
+     * @param stage
+     * @return
+     */
+    public static int getNonLeafTaskNum(Stage stage) {
+      // Getting intermediate data size
+      long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock());
+
+      int mb = (int) Math.ceil((double)volume / 1048576);
+      LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
+      // determine the number of task per 64MB
+      int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
+      LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
+      return maxTaskNum;
+    }
+
+    public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
+                                      ExecutionBlock execBlock) {
+      Map<String, TableDesc> tableMap = context.getTableDescMap();
+      if (masterPlan.isLeaf(execBlock)) {
+        ScanNode[] outerScans = execBlock.getScanNodes();
+        long maxVolume = 0;
+        for (ScanNode eachScanNode: outerScans) {
+          TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
+          if (stat.getNumBytes() > maxVolume) {
+            maxVolume = stat.getNumBytes();
+          }
+        }
+        return maxVolume;
+      } else {
+        long aggregatedVolume = 0;
+        for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
+          Stage stage = context.getStage(childBlock.getId());
+          if (stage == null || stage.getSynchronizedState() != StageState.SUCCEEDED) {
+            aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
+          } else {
+            aggregatedVolume += stage.getResultStats().getNumBytes();
+          }
+        }
+
+        return aggregatedVolume;
+      }
+    }
+
+    public static void allocateContainers(Stage stage) {
+      ExecutionBlock execBlock = stage.getBlock();
+
+      //TODO consider disk slot
+      int requiredMemoryMBPerTask = 512;
+
+      int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers(
+          stage.getContext().getQueryMasterContext().getWorkerContext(),
+          stage.schedulerContext.getEstimatedTaskNum(),
+          requiredMemoryMBPerTask
+      );
+
+      final Resource resource = Records.newRecord(Resource.class);
+
+      resource.setMemory(requiredMemoryMBPerTask);
+
+      LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest);
+
+      Priority priority = Records.newRecord(Priority.class);
+      priority.setPriority(stage.getPriority());
+      ContainerAllocationEvent event =
+          new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
+              stage.getId(), priority, resource, numRequest,
+              stage.masterPlan.isLeaf(execBlock), 0.0f);
+      stage.eventHandler.handle(event);
+    }
+
+    private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException {
+      ExecutionBlock execBlock = stage.getBlock();
+      ScanNode[] scans = execBlock.getScanNodes();
+      Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+      ScanNode scan = scans[0];
+      TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName());
+
+      Collection<Fragment> fragments;
+      TableMeta meta = table.getMeta();
+
+      // Depending on scanner node's type, it creates fragments. If scan is for
+      // a partitioned table, It will creates lots fragments for all partitions.
+      // Otherwise, it creates at least one fragments for a table, which may
+      // span a number of blocks or possibly consists of a number of files.
+      if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+        // After calling this method, partition paths are removed from the physical plan.
+        FileStorageManager storageManager =
+            (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
+        fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
+      } else {
+        StorageManager storageManager =
+            StorageManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType());
+        fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan);
+      }
+
+      Stage.scheduleFragments(stage, fragments);
+      if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) {
+        //Leaf task of DefaultTaskScheduler should be fragment size
+        // EstimatedTaskNum determined number of initial container
+        stage.schedulerContext.setEstimatedTaskNum(fragments.size());
+      } else {
+        TajoConf conf = stage.context.getConf();
+        stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
+        int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
+            (double) stage.schedulerContext.getTaskSize());
+        stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
+      }
+    }
+  }
+
+  public static void scheduleFragment(Stage stage, Fragment fragment) {
+    stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        stage.getId(), fragment));
+  }
+
+
+  public static void scheduleFragments(Stage stage, Collection<Fragment> fragments) {
+    for (Fragment eachFragment : fragments) {
+      scheduleFragment(stage, eachFragment);
+    }
+  }
+
+  public static void scheduleFragments(Stage stage, Collection<Fragment> leftFragments,
+                                       Collection<Fragment> broadcastFragments) {
+    for (Fragment eachLeafFragment : leftFragments) {
+      scheduleFragment(stage, eachLeafFragment, broadcastFragments);
+    }
+  }
+
+  public static void scheduleFragment(Stage stage,
+                                      Fragment leftFragment, Collection<Fragment> rightFragments) {
+    stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        stage.getId(), leftFragment, rightFragments));
+  }
+
+  public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) {
+    stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        stage.getId(), fetches));
+  }
+
+  public static Task newEmptyTask(TaskSchedulerContext schedulerContext,
+                                  TaskAttemptScheduleContext taskContext,
+                                  Stage stage, int taskId) {
+    ExecutionBlock execBlock = stage.getBlock();
+    Task unit = new Task(schedulerContext.getMasterContext().getConf(),
+        taskContext,
+        QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId),
+        schedulerContext.isLeafQuery(), stage.eventHandler);
+    unit.setLogicalPlan(execBlock.getPlan());
+    stage.addTask(unit);
+    return unit;
+  }
+
+  private static class ContainerLaunchTransition
+      implements SingleArcTransition<Stage, StageEvent> {
+
+    @Override
+    public void transition(Stage stage, StageEvent event) {
+      try {
+        StageContainerAllocationEvent allocationEvent =
+            (StageContainerAllocationEvent) event;
+        for (TajoContainer container : allocationEvent.getAllocatedContainer()) {
+          TajoContainerId cId = container.getId();
+          if (stage.containers.containsKey(cId)) {
+            stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
+                "Duplicated containers are allocated: " + cId.toString()));
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
+          }
+          stage.containers.put(cId, container);
+        }
+        LOG.info("Stage (" + stage.getId() + ") has " + stage.containers.size() + " containers!");
+        stage.eventHandler.handle(
+            new LaunchTaskRunnersEvent(stage.getId(), allocationEvent.getAllocatedContainer(),
+                stage.getContext().getQueryContext(),
+                CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class))
+        );
+
+        stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START));
+      } catch (Throwable t) {
+        stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
+      }
+    }
+  }
+
+  /**
+   * It is used in KILL_WAIT state against Contained Allocated event.
+   * It just returns allocated containers to resource manager.
+   */
+  private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> {
+    @Override
+    public void transition(Stage stage, StageEvent event) {
+      try {
+        StageContainerAllocationEvent allocationEvent =
+            (StageContainerAllocationEvent) event;
+        stage.eventHandler.handle(
+            new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
+                stage.getId(), allocationEvent.getAllocatedContainer()));
+        LOG.info(String.format("[%s] %d allocated containers are canceled",
+            stage.getId().toString(),
+            allocationEvent.getAllocatedContainer().size()));
+      } catch (Throwable t) {
+        stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
+      }
+    }
+  }
+
+  private static class TaskCompletedTransition implements SingleArcTransition<Stage, StageEvent> {
+
+    @Override
+    public void transition(Stage stage,
+                           StageEvent event) {
+      StageTaskEvent taskEvent = (StageTaskEvent) event;
+      Task task = stage.getTask(taskEvent.getTaskId());
+
+      if (task == null) { // task failed
+        LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
+        stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
+      } else {
+        stage.completedTaskCount++;
+
+        if (taskEvent.getState() == TaskState.SUCCEEDED) {
+          stage.succeededObjectCount++;
+        } else if (task.getState() == TaskState.KILLED) {
+          stage.killedObjectCount++;
+        } else if (task.getState() == TaskState.FAILED) {
+          stage.failedObjectCount++;
+          // if at least one task is failed, try to kill all tasks.
+          stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+        }
+
+        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
+            stage.getId(),
+            stage.getTotalScheduledObjectsCount(),
+            stage.succeededObjectCount,
+            stage.killedObjectCount,
+            stage.failedObjectCount));
+
+        if (stage.totalScheduledObjectsCount ==
+            stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) {
+          stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+        }
+      }
+    }
+  }
+
+  private static class KillTasksTransition implements SingleArcTransition<Stage, StageEvent> {
+
+    @Override
+    public void transition(Stage stage, StageEvent stageEvent) {
+      if(stage.getTaskScheduler() != null){
+        stage.getTaskScheduler().stop();
+      }
+
+      for (Task task : stage.getTasks()) {
+        stage.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL));
+      }
+    }
+  }
+
+  private void cleanup() {
+    stopScheduler();
+    releaseContainers();
+
+    if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
+      List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
+      List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
+
+      for (ExecutionBlock executionBlock : childs) {
+        ebIds.add(executionBlock.getId().getProto());
+      }
+
+      getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
+    }
+
+    this.finalStageHistory = makeStageHistory();
+    this.finalStageHistory.setTasks(makeTaskHistories());
+  }
+
+  public List<IntermediateEntry> getHashShuffleIntermediateEntries() {
+    return hashShuffleIntermediateEntries;
+  }
+
+  protected void waitingIntermediateReport() {
+    LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
+    synchronized(completeReportReceived) {
+      long startTime = System.currentTimeMillis();
+      while (true) {
+        if (completeReportReceived.get() >= tasks.size()) {
+          LOG.info(getId() + ", completed waiting IntermediateReport");
+          return;
+        } else {
+          try {
+            completeReportReceived.wait(10 * 1000);
+          } catch (InterruptedException e) {
+          }
+          long elapsedTime = System.currentTimeMillis() - startTime;
+          if (elapsedTime >= 120 * 1000) {
+            LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
+            abort(StageState.FAILED);
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
+    LOG.info(getId() + ", receiveExecutionBlockReport:" +  report.getSucceededTasks());
+    if (!report.getReportSuccess()) {
+      LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
+      abort(StageState.FAILED);
+      return;
+    }
+    if (report.getIntermediateEntriesCount() > 0) {
+      synchronized (hashShuffleIntermediateEntries) {
+        for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
+          hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+        }
+      }
+    }
+    synchronized(completeReportReceived) {
+      completeReportReceived.addAndGet(report.getSucceededTasks());
+      completeReportReceived.notifyAll();
+    }
+  }
+
+  private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {
+
+    @Override
+    public StageState transition(Stage stage, StageEvent stageEvent) {
+      // TODO - Commit Stage
+      // TODO - records succeeded, failed, killed completed task
+      // TODO - records metrics
+      try {
+        LOG.info(String.format("Stage completed - %s (total=%d, success=%d, killed=%d)",
+            stage.getId().toString(),
+            stage.getTotalScheduledObjectsCount(),
+            stage.getSucceededObjectCount(),
+            stage.killedObjectCount));
+
+        if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) {
+          if (stage.failedObjectCount > 0) {
+            stage.abort(StageState.FAILED);
+            return StageState.FAILED;
+          } else if (stage.killedObjectCount > 0) {
+            stage.abort(StageState.KILLED);
+            return StageState.KILLED;
+          } else {
+            LOG.error("Invalid State " + stage.getSynchronizedState() + " State");
+            stage.abort(StageState.ERROR);
+            return StageState.ERROR;
+          }
+        } else {
+          stage.complete();
+          return StageState.SUCCEEDED;
+        }
+      } catch (Throwable t) {
+        LOG.error(t.getMessage(), t);
+        stage.abort(StageState.ERROR);
+        return StageState.ERROR;
+      }
+    }
+  }
+
+  private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> {
+    @Override
+    public void transition(Stage stage, StageEvent event) {
+      stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+    }
+  }
+
+  private static class InternalErrorTransition implements SingleArcTransition<Stage, StageEvent> {
+    @Override
+    public void transition(Stage stage, StageEvent stageEvent) {
+      stage.abort(StageState.ERROR);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
new file mode 100644
index 0000000..82a06fe
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+public enum StageState {
+  NEW,
+  INITED,
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  KILL_WAIT,
+  KILLED,
+  ERROR
+}