You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/18 04:57:10 UTC

[4/6] tajo git commit: TAJO-324: Rename the prefix 'QueryUnit' to Task.

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 7f05fa4..d4cc6e7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -23,8 +23,6 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
@@ -55,8 +53,8 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.master.container.TajoContainer;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.FileStorageManager;
@@ -66,7 +64,7 @@ import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.history.QueryUnitHistory;
+import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.util.history.SubQueryHistory;
 import org.apache.tajo.worker.FetchImpl;
 
@@ -105,7 +103,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private long startTime;
   private long finishTime;
 
-  volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
+  volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
   volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
     TajoContainer>();
 
@@ -355,22 +353,22 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   }
 
   public float getProgress() {
-    List<QueryUnit> tempTasks = null;
+    List<Task> tempTasks = null;
     readLock.lock();
     try {
       if (getState() == SubQueryState.NEW) {
         return 0.0f;
       } else {
-        tempTasks = new ArrayList<QueryUnit>(tasks.values());
+        tempTasks = new ArrayList<Task>(tasks.values());
       }
     } finally {
       readLock.unlock();
     }
 
     float totalProgress = 0.0f;
-    for (QueryUnit eachQueryUnit: tempTasks) {
-      if (eachQueryUnit.getLastAttempt() != null) {
-        totalProgress += eachQueryUnit.getLastAttempt().getProgress();
+    for (Task eachTask : tempTasks) {
+      if (eachTask.getLastAttempt() != null) {
+        totalProgress += eachTask.getLastAttempt().getProgress();
       }
     }
 
@@ -393,7 +391,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return block;
   }
 
-  public void addTask(QueryUnit task) {
+  public void addTask(Task task) {
     tasks.put(task.getId(), task);
   }
 
@@ -401,7 +399,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     if (finalSubQueryHistory != null) {
       if (finalSubQueryHistory.getFinishTime() == 0) {
         finalSubQueryHistory = makeSubQueryHistory();
-        finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories());
+        finalSubQueryHistory.setTasks(makeTaskHistories());
       }
       return finalSubQueryHistory;
     } else {
@@ -409,14 +407,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  private List<QueryUnitHistory> makeQueryUnitHistories() {
-    List<QueryUnitHistory> queryUnitHistories = new ArrayList<QueryUnitHistory>();
+  private List<TaskHistory> makeTaskHistories() {
+    List<TaskHistory> taskHistories = new ArrayList<TaskHistory>();
 
-    for(QueryUnit eachQueryUnit: getQueryUnits()) {
-      queryUnitHistories.add(eachQueryUnit.getQueryUnitHistory());
+    for(Task eachTask : getTasks()) {
+      taskHistories.add(eachTask.getTaskHistory());
     }
 
-    return queryUnitHistories;
+    return taskHistories;
   }
 
   private SubQueryHistory makeSubQueryHistory() {
@@ -440,16 +438,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     long totalWriteBytes = 0;
     long totalWriteRows = 0;
     int numShuffles = 0;
-    for(QueryUnit eachQueryUnit: getQueryUnits()) {
-      numShuffles = eachQueryUnit.getShuffleOutpuNum();
-      if (eachQueryUnit.getLastAttempt() != null) {
-        TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats();
+    for(Task eachTask : getTasks()) {
+      numShuffles = eachTask.getShuffleOutpuNum();
+      if (eachTask.getLastAttempt() != null) {
+        TableStats inputStats = eachTask.getLastAttempt().getInputStats();
         if (inputStats != null) {
           totalInputBytes += inputStats.getNumBytes();
           totalReadBytes += inputStats.getReadBytes();
           totalReadRows += inputStats.getNumRows();
         }
-        TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats();
+        TableStats outputStats = eachTask.getLastAttempt().getResultStats();
         if (outputStats != null) {
           totalWriteBytes += outputStats.getNumBytes();
           totalWriteRows += outputStats.getNumRows();
@@ -511,11 +509,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return block.getId();
   }
   
-  public QueryUnit[] getQueryUnits() {
-    return tasks.values().toArray(new QueryUnit[tasks.size()]);
+  public Task[] getTasks() {
+    return tasks.values().toArray(new Task[tasks.size()]);
   }
   
-  public QueryUnit getQueryUnit(QueryUnitId qid) {
+  public Task getTask(TaskId qid) {
     return tasks.get(qid);
   }
 
@@ -635,7 +633,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private TableStats[] computeStatFromTasks() {
     List<TableStats> inputStatsList = Lists.newArrayList();
     List<TableStats> resultStatsList = Lists.newArrayList();
-    for (QueryUnit unit : getQueryUnits()) {
+    for (Task unit : getTasks()) {
       resultStatsList.add(unit.getStats());
       if (unit.getLastAttempt().getInputStats() != null) {
         inputStatsList.add(unit.getLastAttempt().getInputStats());
@@ -1100,13 +1098,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.getId(), fetches));
   }
 
-  public static QueryUnit newEmptyQueryUnit(TaskSchedulerContext schedulerContext,
-                                            QueryUnitAttemptScheduleContext queryUnitContext,
-                                            SubQuery subQuery, int taskId) {
+  public static Task newEmptyTask(TaskSchedulerContext schedulerContext,
+                                  TaskAttemptScheduleContext taskContext,
+                                  SubQuery subQuery, int taskId) {
     ExecutionBlock execBlock = subQuery.getBlock();
-    QueryUnit unit = new QueryUnit(schedulerContext.getMasterContext().getConf(),
-        queryUnitContext,
-        QueryIdFactory.newQueryUnitId(schedulerContext.getBlockId(), taskId),
+    Task unit = new Task(schedulerContext.getMasterContext().getConf(),
+        taskContext,
+        QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId),
         schedulerContext.isLeafQuery(), subQuery.eventHandler);
     unit.setLogicalPlan(execBlock.getPlan());
     subQuery.addTask(unit);
@@ -1176,7 +1174,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     public void transition(SubQuery subQuery,
                            SubQueryEvent event) {
       SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
-      QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId());
+      Task task = subQuery.getTask(taskEvent.getTaskId());
 
       if (task == null) { // task failed
         LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
@@ -1217,8 +1215,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.getTaskScheduler().stop();
       }
 
-      for (QueryUnit queryUnit : subQuery.getQueryUnits()) {
-        subQuery.eventHandler.handle(new TaskEvent(queryUnit.getId(), TaskEventType.T_KILL));
+      for (Task task : subQuery.getTasks()) {
+        subQuery.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL));
       }
     }
   }
@@ -1239,7 +1237,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
 
     this.finalSubQueryHistory = makeSubQueryHistory();
-    this.finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories());
+    this.finalSubQueryHistory.setTasks(makeTaskHistories());
   }
 
   public List<IntermediateEntry> getHashShuffleIntermediateEntries() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
new file mode 100644
index 0000000..7de3933
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
@@ -0,0 +1,907 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
+import org.apache.tajo.master.FragmentPair;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.history.TaskHistory;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
+public class Task implements EventHandler<TaskEvent> {
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(Task.class);
+
+  private final Configuration systemConf;
+	private TaskId taskId;
+  private EventHandler eventHandler;
+	private StoreTableNode store = null;
+	private LogicalNode plan = null;
+	private List<ScanNode> scan;
+	
+	private Map<String, Set<FragmentProto>> fragMap;
+	private Map<String, Set<FetchImpl>> fetchMap;
+
+  private int totalFragmentNum;
+
+  private List<ShuffleFileOutput> shuffleFileOutputs;
+	private TableStats stats;
+  private final boolean isLeafTask;
+  private List<IntermediateEntry> intermediateData;
+
+  private Map<TaskAttemptId, TaskAttempt> attempts;
+  private final int maxAttempts = 3;
+  private Integer nextAttempt = -1;
+  private TaskAttemptId lastAttemptId;
+
+  private TaskAttemptId successfulAttempt;
+  private String succeededHost;
+  private int succeededHostPort;
+  private int succeededPullServerPort;
+
+  private int failedAttempts;
+  private int finishedAttempts; // finish are total of success, failed and killed
+
+  private long launchTime;
+  private long finishTime;
+
+  private List<DataLocation> dataLocations = Lists.newArrayList();
+
+  private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+
+  private TaskHistory finalTaskHistory;
+
+  protected static final StateMachineFactory
+      <Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
+      new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+          // Transitions from NEW state
+          .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+              TaskEventType.T_SCHEDULE,
+              new InitialScheduleTransition())
+          .addTransition(TaskState.NEW, TaskState.KILLED,
+              TaskEventType.T_KILL,
+              new KillNewTaskTransition())
+
+          // Transitions from SCHEDULED state
+          .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new AttemptLaunchedTransition())
+          .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+
+          // Transitions from RUNNING state
+          .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED)
+          .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              new AttemptSucceededTransition())
+          .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+          .addTransition(TaskState.RUNNING,
+              EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedOrRetryTransition())
+
+          // Transitions from KILL_WAIT state
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_KILLED,
+              ATTEMPT_KILLED_TRANSITION)
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new KillTaskTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              ATTEMPT_KILLED_TRANSITION)
+              // Ignore-able transitions.
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              EnumSet.of(
+                  TaskEventType.T_KILL,
+                  TaskEventType.T_SCHEDULE))
+
+          // Transitions from SUCCEEDED state
+          // Ignore-able transitions
+          .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+              EnumSet.of(TaskEventType.T_KILL,
+                  TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
+
+          // Transitions from FAILED state
+          // Ignore-able transitions
+          .addTransition(TaskState.FAILED, TaskState.FAILED,
+              EnumSet.of(TaskEventType.T_KILL,
+                  TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
+
+          // Transitions from KILLED state
+          .addTransition(TaskState.KILLED, TaskState.KILLED, TaskEventType.T_ATTEMPT_KILLED, new KillTaskTransition())
+          // Ignore-able transitions
+          .addTransition(TaskState.KILLED, TaskState.KILLED,
+              EnumSet.of(
+                  TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
+
+          .installTopology();
+
+  private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
+
+
+  private final Lock readLock;
+  private final Lock writeLock;
+  private TaskAttemptScheduleContext scheduleContext;
+
+	public Task(Configuration conf, TaskAttemptScheduleContext scheduleContext,
+              TaskId id, boolean isLeafTask, EventHandler eventHandler) {
+    this.systemConf = conf;
+		this.taskId = id;
+    this.eventHandler = eventHandler;
+    this.isLeafTask = isLeafTask;
+		scan = new ArrayList<ScanNode>();
+    fetchMap = Maps.newHashMap();
+    fragMap = Maps.newHashMap();
+    shuffleFileOutputs = new ArrayList<ShuffleFileOutput>();
+    attempts = Collections.emptyMap();
+    lastAttemptId = null;
+    nextAttempt = -1;
+    failedAttempts = 0;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+    this.scheduleContext = scheduleContext;
+
+    stateMachine = stateMachineFactory.make(this);
+    totalFragmentNum = 0;
+	}
+
+  public boolean isLeafTask() {
+    return this.isLeafTask;
+  }
+
+  public TaskState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public TaskAttemptState getLastAttemptStatus() {
+    TaskAttempt lastAttempt = getLastAttempt();
+    if (lastAttempt != null) {
+      return lastAttempt.getState();
+    } else {
+      return TaskAttemptState.TA_ASSIGNED;
+    }
+  }
+
+  public TaskHistory getTaskHistory() {
+    if (finalTaskHistory != null) {
+      if (finalTaskHistory.getFinishTime() == 0) {
+        finalTaskHistory = makeTaskHistory();
+      }
+      return finalTaskHistory;
+    } else {
+      return makeTaskHistory();
+    }
+  }
+
+  private TaskHistory makeTaskHistory() {
+    TaskHistory taskHistory = new TaskHistory();
+
+    TaskAttempt lastAttempt = getLastAttempt();
+    if (lastAttempt != null) {
+      taskHistory.setId(lastAttempt.getId().toString());
+      taskHistory.setState(lastAttempt.getState().toString());
+      taskHistory.setProgress(lastAttempt.getProgress());
+    }
+    taskHistory.setHostAndPort(succeededHost + ":" + succeededHostPort);
+    taskHistory.setRetryCount(this.getRetryCount());
+    taskHistory.setLaunchTime(launchTime);
+    taskHistory.setFinishTime(finishTime);
+
+    taskHistory.setNumShuffles(getShuffleOutpuNum());
+    if (!getShuffleFileOutputs().isEmpty()) {
+      ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0);
+      if (taskHistory.getNumShuffles() > 0) {
+        taskHistory.setShuffleKey("" + shuffleFileOutputs.getPartId());
+        taskHistory.setShuffleFileName(shuffleFileOutputs.getFileName());
+      }
+    }
+
+    List<String> fragmentList = new ArrayList<String>();
+    for (FragmentProto eachFragment : getAllFragments()) {
+      try {
+        Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment);
+        fragmentList.add(fragment.toString());
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+        fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage());
+      }
+    }
+    taskHistory.setFragments(fragmentList.toArray(new String[]{}));
+
+    List<String[]> fetchList = new ArrayList<String[]>();
+    for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
+      for (FetchImpl f : e.getValue()) {
+        for (URI uri : f.getSimpleURIs()){
+          fetchList.add(new String[] {e.getKey(), uri.toString()});
+        }
+      }
+    }
+
+    taskHistory.setFetchs(fetchList.toArray(new String[][]{}));
+
+    List<String> dataLocationList = new ArrayList<String>();
+    for(DataLocation eachLocation: getDataLocations()) {
+      dataLocationList.add(eachLocation.toString());
+    }
+
+    taskHistory.setDataLocations(dataLocationList.toArray(new String[]{}));
+    return taskHistory;
+  }
+
+	public void setLogicalPlan(LogicalNode plan) {
+	  this.plan = plan;
+
+	  LogicalNode node = plan;
+	  ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+	  s.add(node);
+	  while (!s.isEmpty()) {
+	    node = s.remove(s.size()-1);
+	    if (node instanceof UnaryNode) {
+	      UnaryNode unary = (UnaryNode) node;
+	      s.add(s.size(), unary.getChild());
+	    } else if (node instanceof BinaryNode) {
+	      BinaryNode binary = (BinaryNode) node;
+	      s.add(s.size(), binary.getLeftChild());
+	      s.add(s.size(), binary.getRightChild());
+	    } else if (node instanceof ScanNode) {
+	      scan.add((ScanNode)node);
+	    } else if (node instanceof TableSubQueryNode) {
+        s.add(((TableSubQueryNode) node).getSubQuery());
+      }
+	  }
+	}
+
+  private void addDataLocation(Fragment fragment) {
+    String[] hosts = fragment.getHosts();
+    int[] diskIds = null;
+    if (fragment instanceof FileFragment) {
+      diskIds = ((FileFragment)fragment).getDiskIds();
+    }
+    for (int i = 0; i < hosts.length; i++) {
+      dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
+    }
+  }
+
+  public void addFragment(Fragment fragment, boolean useDataLocation) {
+    Set<FragmentProto> fragmentProtos;
+    if (fragMap.containsKey(fragment.getTableName())) {
+      fragmentProtos = fragMap.get(fragment.getTableName());
+    } else {
+      fragmentProtos = new HashSet<FragmentProto>();
+      fragMap.put(fragment.getTableName(), fragmentProtos);
+    }
+    fragmentProtos.add(fragment.getProto());
+    if (useDataLocation) {
+      addDataLocation(fragment);
+    }
+    totalFragmentNum++;
+  }
+
+  public void addFragments(Collection<Fragment> fragments) {
+    for (Fragment eachFragment: fragments) {
+      addFragment(eachFragment, false);
+    }
+  }
+
+  public void setFragment(FragmentPair[] fragmentPairs) {
+    for (FragmentPair eachFragmentPair : fragmentPairs) {
+      this.addFragment(eachFragmentPair.getLeftFragment(), true);
+      if (eachFragmentPair.getRightFragment() != null) {
+        this.addFragment(eachFragmentPair.getRightFragment(), true);
+      }
+    }
+  }
+
+  public List<DataLocation> getDataLocations() {
+    return dataLocations;
+  }
+
+  public String getSucceededHost() {
+    return succeededHost;
+  }
+	
+	public void addFetches(String tableId, Collection<FetchImpl> fetches) {
+	  Set<FetchImpl> fetchSet;
+    if (fetchMap.containsKey(tableId)) {
+      fetchSet = fetchMap.get(tableId);
+    } else {
+      fetchSet = Sets.newHashSet();
+    }
+    fetchSet.addAll(fetches);
+    fetchMap.put(tableId, fetchSet);
+	}
+	
+	public void setFetches(Map<String, Set<FetchImpl>> fetches) {
+	  this.fetchMap.clear();
+	  this.fetchMap.putAll(fetches);
+	}
+
+  public Collection<FragmentProto> getAllFragments() {
+    Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>();
+    for (Set<FragmentProto> eachFragmentSet : fragMap.values()) {
+      fragmentProtos.addAll(eachFragmentSet);
+    }
+    return fragmentProtos;
+  }
+	
+	public LogicalNode getLogicalPlan() {
+	  return this.plan;
+	}
+	
+	public TaskId getId() {
+		return taskId;
+	}
+	
+	public Collection<FetchImpl> getFetchHosts(String tableId) {
+	  return fetchMap.get(tableId);
+	}
+	
+	public Collection<Set<FetchImpl>> getFetches() {
+	  return fetchMap.values();
+	}
+
+  public Map<String, Set<FetchImpl>> getFetchMap() {
+    return fetchMap;
+  }
+	
+	public Collection<FetchImpl> getFetch(ScanNode scan) {
+	  return this.fetchMap.get(scan.getTableName());
+	}
+	
+	public ScanNode[] getScanNodes() {
+	  return this.scan.toArray(new ScanNode[scan.size()]);
+	}
+	
+	@Override
+	public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(plan.getType() + " \n");
+		for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
+		  builder.append(e.getKey()).append(" : ");
+      for (FragmentProto fragment : e.getValue()) {
+        builder.append(fragment).append(", ");
+      }
+		}
+		for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
+      builder.append(e.getKey()).append(" : ");
+      for (FetchImpl t : e.getValue()) {
+        for (URI uri : t.getURIs()){
+          builder.append(uri).append(" ");
+        }
+      }
+    }
+		
+		return builder.toString();
+	}
+	
+	public void setStats(TableStats stats) {
+	  this.stats = stats;
+	}
+	
+	public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) {
+	  this.shuffleFileOutputs = Collections.unmodifiableList(partitions);
+	}
+	
+	public TableStats getStats() {
+	  return this.stats;
+	}
+	
+	public List<ShuffleFileOutput> getShuffleFileOutputs() {
+	  return this.shuffleFileOutputs;
+	}
+	
+	public int getShuffleOutpuNum() {
+	  return this.shuffleFileOutputs.size();
+	}
+
+  public TaskAttempt newAttempt() {
+    TaskAttempt attempt = new TaskAttempt(scheduleContext,
+        QueryIdFactory.newTaskAttemptId(this.getId(), ++nextAttempt),
+        this, eventHandler);
+    lastAttemptId = attempt.getId();
+    return attempt;
+  }
+
+  public TaskAttempt getAttempt(TaskAttemptId attemptId) {
+    return attempts.get(attemptId);
+  }
+
+  public TaskAttempt getAttempt(int attempt) {
+    return this.attempts.get(QueryIdFactory.newTaskAttemptId(this.getId(), attempt));
+  }
+
+  public TaskAttempt getLastAttempt() {
+    return getAttempt(this.lastAttemptId);
+  }
+
+  public TaskAttempt getSuccessfulAttempt() {
+    readLock.lock();
+    try {
+      if (null == successfulAttempt) {
+        return null;
+      }
+      return attempts.get(successfulAttempt);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public int getRetryCount () {
+    return this.nextAttempt;
+  }
+
+  public int getTotalFragmentNum() {
+    return totalFragmentNum;
+  }
+
+  private static class InitialScheduleTransition implements
+    SingleArcTransition<Task, TaskEvent> {
+
+    @Override
+    public void transition(Task task, TaskEvent taskEvent) {
+      task.addAndScheduleAttempt();
+    }
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  @VisibleForTesting
+  public void setLaunchTime(long launchTime) {
+    this.launchTime = launchTime;
+  }
+
+  @VisibleForTesting
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public long getRunningTime() {
+    if(finishTime > 0) {
+      return finishTime - launchTime;
+    } else {
+      return System.currentTimeMillis() - launchTime;
+    }
+  }
+
+  // This is always called in the Write Lock
+  private void addAndScheduleAttempt() {
+    // Create new task attempt
+    TaskAttempt attempt = newAttempt();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created attempt " + attempt.getId());
+    }
+    switch (attempts.size()) {
+      case 0:
+        attempts = Collections.singletonMap(attempt.getId(), attempt);
+        break;
+
+      case 1:
+        Map<TaskAttemptId, TaskAttempt> newAttempts
+            = new LinkedHashMap<TaskAttemptId, TaskAttempt>(3);
+        newAttempts.putAll(attempts);
+        attempts = newAttempts;
+        attempts.put(attempt.getId(), attempt);
+        break;
+
+      default:
+        attempts.put(attempt.getId(), attempt);
+        break;
+    }
+
+    if (failedAttempts > 0) {
+      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
+          TaskAttemptEventType.TA_RESCHEDULE));
+    } else {
+      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+  }
+
+  private void finishTask() {
+    this.finishTime = System.currentTimeMillis();
+    finalTaskHistory = makeTaskHistory();
+  }
+
+  private static class KillNewTaskTransition implements SingleArcTransition<Task, TaskEvent> {
+
+    @Override
+    public void transition(Task task, TaskEvent taskEvent) {
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
+  private static class KillTaskTransition implements SingleArcTransition<Task, TaskEvent> {
+
+    @Override
+    public void transition(Task task, TaskEvent taskEvent) {
+      task.finishTask();
+      task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
+    }
+  }
+
+  private static class AttemptKilledTransition implements SingleArcTransition<Task, TaskEvent>{
+
+    @Override
+    public void transition(Task task, TaskEvent event) {
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
+  private static class AttemptSucceededTransition
+      implements SingleArcTransition<Task, TaskEvent>{
+
+    @Override
+    public void transition(Task task,
+                           TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
+
+      task.successfulAttempt = attemptEvent.getTaskAttemptId();
+      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
+      task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
+      task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
+    }
+  }
+
+  private static class AttemptLaunchedTransition implements SingleArcTransition<Task, TaskEvent> {
+    @Override
+    public void transition(Task task,
+                           TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
+      task.launchTime = System.currentTimeMillis();
+      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
+      task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
+    }
+  }
+
+  private static class AttemptFailedTransition implements SingleArcTransition<Task, TaskEvent> {
+    @Override
+    public void transition(Task task, TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      LOG.info("=============================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+      LOG.info("=============================================================");
+      task.failedAttempts++;
+      task.finishedAttempts++;
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+    }
+  }
+
+  private static class AttemptFailedOrRetryTransition implements
+    MultipleArcTransition<Task, TaskEvent, TaskState> {
+
+    @Override
+    public TaskState transition(Task task, TaskEvent taskEvent) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
+      task.failedAttempts++;
+      task.finishedAttempts++;
+      boolean retry = task.failedAttempts < task.maxAttempts;
+
+      LOG.info("====================================================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " +
+          "retry:" + retry + ", attempts:" +  task.failedAttempts + " <<<");
+      LOG.info("====================================================================================");
+
+      if (retry) {
+        if (task.successfulAttempt == null) {
+          task.addAndScheduleAttempt();
+        }
+      } else {
+        task.finishTask();
+        task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+        return TaskState.FAILED;
+      }
+
+      return task.getState();
+    }
+  }
+
+  @Override
+  public void handle(TaskEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskId() + " of type "
+          + event.getType());
+    }
+
+    try {
+      writeLock.lock();
+      TaskState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
+        eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
+            QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getState()) {
+          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void setIntermediateData(Collection<IntermediateEntry> partitions) {
+    this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
+  }
+
+  public List<IntermediateEntry> getIntermediateData() {
+    return this.intermediateData;
+  }
+
+  public static class PullHost implements Cloneable {
+    String host;
+    int port;
+    int hashCode;
+
+    public PullHost(String pullServerAddr, int pullServerPort){
+      this.host = pullServerAddr;
+      this.port = pullServerPort;
+      this.hashCode = Objects.hashCode(host, port);
+    }
+    public String getHost() {
+      return host;
+    }
+
+    public int getPort() {
+      return this.port;
+    }
+
+    public String getPullAddress() {
+      return host + ":" + port;
+    }
+
+    @Override
+    public int hashCode() {
+      return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof PullHost) {
+        PullHost other = (PullHost) obj;
+        return host.equals(other.host) && port == other.port;
+      }
+
+      return false;
+    }
+
+    @Override
+    public PullHost clone() throws CloneNotSupportedException {
+      PullHost newPullHost = (PullHost) super.clone();
+      newPullHost.host = host;
+      newPullHost.port = port;
+      newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port);
+      return newPullHost;
+    }
+
+    @Override
+    public String toString() {
+      return host + ":" + port;
+    }
+  }
+
+  public static class IntermediateEntry {
+    ExecutionBlockId ebId;
+    int taskId;
+    int attemptId;
+    int partId;
+    PullHost host;
+    long volume;
+    List<Pair<Long, Integer>> pages;
+    List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
+
+    public IntermediateEntry(IntermediateEntryProto proto) {
+      this.ebId = new ExecutionBlockId(proto.getEbId());
+      this.taskId = proto.getTaskId();
+      this.attemptId = proto.getAttemptId();
+      this.partId = proto.getPartId();
+
+      String[] pullHost = proto.getHost().split(":");
+      this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
+      this.volume = proto.getVolume();
+
+      failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+      for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
+
+        failureRowNums.add(new Pair(eachFailure.getPagePos(),
+            new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
+      }
+
+      pages = new ArrayList<Pair<Long, Integer>>();
+      for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
+        pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
+      }
+    }
+
+    public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
+      this.taskId = taskId;
+      this.attemptId = attemptId;
+      this.partId = partId;
+      this.host = host;
+    }
+
+    public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) {
+      this.taskId = taskId;
+      this.attemptId = attemptId;
+      this.partId = partId;
+      this.host = host;
+      this.volume = volume;
+    }
+
+    public ExecutionBlockId getEbId() {
+      return ebId;
+    }
+
+    public void setEbId(ExecutionBlockId ebId) {
+      this.ebId = ebId;
+    }
+
+    public int getTaskId() {
+      return this.taskId;
+    }
+
+    public int getAttemptId() {
+      return this.attemptId;
+    }
+
+    public int getPartId() {
+      return this.partId;
+    }
+
+    public PullHost getPullHost() {
+      return this.host;
+    }
+
+    public long getVolume() {
+      return this.volume;
+    }
+
+    public long setVolume(long volume) {
+      return this.volume = volume;
+    }
+
+    public List<Pair<Long, Integer>> getPages() {
+      return pages;
+    }
+
+    public void setPages(List<Pair<Long, Integer>> pages) {
+      this.pages = pages;
+    }
+
+    public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
+      return failureRowNums;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(ebId, taskId, partId, attemptId, host);
+    }
+
+    public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
+      List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
+
+      if (pages == null || pages.isEmpty()) {
+        return splits;
+      }
+      int pageSize = pages.size();
+
+      long currentOffset = -1;
+      long currentBytes = 0;
+
+      long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
+      for (int i = 0; i < pageSize; i++) {
+        Pair<Long, Integer> eachPage = pages.get(i);
+        if (currentOffset == -1) {
+          currentOffset = eachPage.getFirst();
+        }
+        if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
+          splits.add(new Pair(currentOffset, currentBytes));
+          currentOffset = eachPage.getFirst();
+          currentBytes = 0;
+          realSplitVolume = splitVolume;
+        }
+
+        currentBytes += eachPage.getSecond();
+      }
+
+      //add last
+      if (currentBytes > 0) {
+        splits.add(new Pair(currentOffset, currentBytes));
+      }
+      return splits;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
new file mode 100644
index 0000000..63c6dbb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.master.querymaster.Task.PullHost;
+import org.apache.tajo.master.container.TajoContainerId;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
+public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttempt.class);
+
+  private final static int EXPIRE_TIME = 15000;
+
+  private final TaskAttemptId id;
+  private final Task task;
+  final EventHandler eventHandler;
+
+  private TajoContainerId containerId;
+  private WorkerConnectionInfo workerConnectionInfo;
+  private int expire;
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  private final TaskAttemptScheduleContext scheduleContext;
+
+  private float progress;
+  private CatalogProtos.TableStatsProto inputStats;
+  private CatalogProtos.TableStatsProto resultStats;
+
+  protected static final StateMachineFactory
+      <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      stateMachineFactory = new StateMachineFactory
+      <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      (TaskAttemptState.TA_NEW)
+
+      // Transitions from TA_NEW state
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new TaskKilledCompleteTransition())
+
+      // Transitions from TA_UNASSIGNED state
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new LaunchTransition())
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillUnassignedTaskTransition())
+
+      // Transitions from TA_ASSIGNED state
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED,
+          EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      // Transitions from TA_RUNNING state
+      .addTransition(TaskAttemptState.TA_RUNNING,
+          EnumSet.of(TaskAttemptState.TA_RUNNING),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_LOCAL_KILLED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_SCHEDULE_CANCELED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DONE,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR)
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          EnumSet.of(
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+              TaskAttemptEventType.TA_UPDATE))
+
+      // Transitions from TA_SUCCEEDED state
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_UPDATE)
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+       // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_KILL)
+
+      // Transitions from TA_KILLED state
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
+      // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_UPDATE))
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_LOCAL_KILLED,
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_ASSIGNED,
+              TaskAttemptEventType.TA_DONE),
+          new TaskKilledCompleteTransition())
+      .installTopology();
+
+  private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+    stateMachine;
+
+
+  public TaskAttempt(final TaskAttemptScheduleContext scheduleContext,
+                     final TaskAttemptId id, final Task task,
+                     final EventHandler eventHandler) {
+    this.scheduleContext = scheduleContext;
+    this.id = id;
+    this.expire = TaskAttempt.EXPIRE_TIME;
+    this.task = task;
+    this.eventHandler = eventHandler;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public TaskAttemptState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public TaskAttemptId getId() {
+    return this.id;
+  }
+
+  public boolean isLeafTask() {
+    return this.task.isLeafTask();
+  }
+
+  public Task getTask() {
+    return this.task;
+  }
+
+  public WorkerConnectionInfo getWorkerConnectionInfo() {
+    return this.workerConnectionInfo;
+  }
+
+  public void setContainerId(TajoContainerId containerId) {
+    this.containerId = containerId;
+  }
+
+  public synchronized void setExpireTime(int expire) {
+    this.expire = expire;
+  }
+
+  public synchronized void updateExpireTime(int period) {
+    this.setExpireTime(this.expire - period);
+  }
+
+  public synchronized void resetExpireTime() {
+    this.setExpireTime(TaskAttempt.EXPIRE_TIME);
+  }
+
+  public int getLeftTime() {
+    return this.expire;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public TableStats getInputStats() {
+    if (inputStats == null) {
+      return null;
+    }
+
+    return new TableStats(inputStats);
+  }
+
+  public TableStats getResultStats() {
+    if (resultStats == null) {
+      return null;
+    }
+    return new TableStats(resultStats);
+  }
+
+  private void fillTaskStatistics(TaskCompletionReport report) {
+    this.progress = 1.0f;
+
+    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+
+    if (report.getShuffleFileOutputsCount() > 0) {
+      this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList());
+
+      PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
+      for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
+        IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(),
+            getId().getId(), p.getPartId(), host, p.getVolume());
+        partitions.add(entry);
+      }
+    }
+    this.getTask().setIntermediateData(partitions);
+
+    if (report.hasInputStats()) {
+      this.inputStats = report.getInputStats();
+    }
+    if (report.hasResultStats()) {
+      this.resultStats = report.getResultStats();
+      this.getTask().setStats(new TableStats(resultStats));
+    }
+  }
+
+  private static class TaskAttemptScheduleTransition implements
+      SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
+          EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
+  private static class KillUnassignedTaskTransition implements
+      SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent(
+          EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
+  private static class LaunchTransition
+      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+      taskAttempt.containerId = castEvent.getContainerId();
+      taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
+      taskAttempt.eventHandler.handle(
+          new TaskTAttemptEvent(taskAttempt.getId(),
+              TaskEventType.T_ATTEMPT_LAUNCHED));
+    }
+  }
+
+  private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(),
+          TaskEventType.T_ATTEMPT_KILLED));
+      LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
+    }
+  }
+
+  private static class StatusUpdateTransition
+      implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> {
+
+    @Override
+    public TaskAttemptState transition(TaskAttempt taskAttempt,
+                                       TaskAttemptEvent event) {
+      TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
+
+      taskAttempt.progress = updateEvent.getStatus().getProgress();
+      taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
+      taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
+
+      return TaskAttemptState.TA_RUNNING;
+    }
+  }
+
+  private void addDiagnosticInfo(String diag) {
+    if (diag != null && !diag.equals("")) {
+      diagnostics.add(diag);
+    }
+  }
+
+  private static class AlreadyAssignedTransition
+      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+    }
+  }
+
+  private static class AlreadyDoneTransition
+      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+    }
+  }
+
+  private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
+
+      try {
+        taskAttempt.fillTaskStatistics(report);
+        taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
+      } catch (Throwable t) {
+        taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
+        taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
+      }
+    }
+  }
+
+  private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
+      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
+          LocalTaskEventType.KILL));
+    }
+  }
+
+  private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{
+    @Override
+    public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
+      TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
+      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
+      LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
+          + " >> " + errorEvent.errorMessage());
+    }
+  }
+
+  @Override
+  public void handle(TaskAttemptEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
+    }
+    try {
+      writeLock.lock();
+      TaskAttemptState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
+        eventHandler.handle(
+            new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+                "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
+        eventHandler.handle(
+            new SubQueryEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+                SubQueryEventType.SQ_INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+       if (oldState != getState()) {
+          LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 6d3597d..0573197 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -27,9 +27,9 @@ import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.ha.HAService;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.Task;
 import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.util.history.QueryUnitHistory;
+import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.util.history.SubQueryHistory;
 import org.apache.tajo.worker.TaskRunnerHistory;
 import org.apache.tajo.worker.TaskRunner;
@@ -42,28 +42,28 @@ import static org.apache.tajo.conf.TajoConf.ConfVars;
 public class JSPUtil {
   static DecimalFormat decimalF = new DecimalFormat("###.0");
 
-  public static void sortQueryUnitArray(QueryUnit[] queryUnits, String sortField, String sortOrder) {
+  public static void sortTaskArray(Task[] tasks, String sortField, String sortOrder) {
     if(sortField == null || sortField.isEmpty()) {
       sortField = "id";
     }
 
-    Arrays.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder)));
+    Arrays.sort(tasks, new TaskComparator(sortField, "asc".equals(sortOrder)));
   }
 
-  public static void sortQueryUnit(List<QueryUnit> queryUnits, String sortField, String sortOrder) {
+  public static void sortTasks(List<Task> tasks, String sortField, String sortOrder) {
     if(sortField == null || sortField.isEmpty()) {
       sortField = "id";
     }
 
-    Collections.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder)));
+    Collections.sort(tasks, new TaskComparator(sortField, "asc".equals(sortOrder)));
   }
 
-  public static void sortQueryUnitHistory(List<QueryUnitHistory> queryUnits, String sortField, String sortOrder) {
+  public static void sortTaskHistory(List<TaskHistory> tasks, String sortField, String sortOrder) {
     if(sortField == null || sortField.isEmpty()) {
       sortField = "id";
     }
 
-    Collections.sort(queryUnits, new QueryUnitHistoryComparator(sortField, "asc".equals(sortOrder)));
+    Collections.sort(tasks, new TaskHistoryComparator(sortField, "asc".equals(sortOrder)));
   }
 
   public static void sortTaskRunner(List<TaskRunner> taskRunners) {
@@ -204,95 +204,95 @@ public class JSPUtil {
     return activeLabel;
   }
 
-  static class QueryUnitComparator implements Comparator<QueryUnit> {
+  static class TaskComparator implements Comparator<Task> {
     private String sortField;
     private boolean asc;
-    public QueryUnitComparator(String sortField, boolean asc) {
+    public TaskComparator(String sortField, boolean asc) {
       this.sortField = sortField;
       this.asc = asc;
     }
 
     @Override
-    public int compare(QueryUnit queryUnit, QueryUnit queryUnit2) {
+    public int compare(Task task, Task task2) {
       if(asc) {
         if("id".equals(sortField)) {
-          return queryUnit.getId().compareTo(queryUnit2.getId());
+          return task.getId().compareTo(task2.getId());
         } else if("host".equals(sortField)) {
-          String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost();
-          String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost();
+          String host1 = task.getSucceededHost() == null ? "-" : task.getSucceededHost();
+          String host2 = task2.getSucceededHost() == null ? "-" : task2.getSucceededHost();
           return host1.compareTo(host2);
         } else if("runTime".equals(sortField)) {
-          return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime());
+          return compareLong(task.getRunningTime(), task2.getRunningTime());
         } else if("startTime".equals(sortField)) {
-          return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime());
+          return compareLong(task.getLaunchTime(), task2.getLaunchTime());
         } else {
-          return queryUnit.getId().compareTo(queryUnit2.getId());
+          return task.getId().compareTo(task2.getId());
         }
       } else {
         if("id".equals(sortField)) {
-          return queryUnit2.getId().compareTo(queryUnit.getId());
+          return task2.getId().compareTo(task.getId());
         } else if("host".equals(sortField)) {
-          String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost();
-          String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost();
+          String host1 = task.getSucceededHost() == null ? "-" : task.getSucceededHost();
+          String host2 = task2.getSucceededHost() == null ? "-" : task2.getSucceededHost();
           return host2.compareTo(host1);
         } else if("runTime".equals(sortField)) {
-          if(queryUnit2.getLaunchTime() == 0) {
+          if(task2.getLaunchTime() == 0) {
             return -1;
-          } else if(queryUnit.getLaunchTime() == 0) {
+          } else if(task.getLaunchTime() == 0) {
             return 1;
           }
-          return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
+          return compareLong(task2.getRunningTime(), task.getRunningTime());
         } else if("startTime".equals(sortField)) {
-          return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime());
+          return compareLong(task2.getLaunchTime(), task.getLaunchTime());
         } else {
-          return queryUnit2.getId().compareTo(queryUnit.getId());
+          return task2.getId().compareTo(task.getId());
         }
       }
     }
   }
 
-  static class QueryUnitHistoryComparator implements Comparator<QueryUnitHistory> {
+  static class TaskHistoryComparator implements Comparator<TaskHistory> {
     private String sortField;
     private boolean asc;
-    public QueryUnitHistoryComparator(String sortField, boolean asc) {
+    public TaskHistoryComparator(String sortField, boolean asc) {
       this.sortField = sortField;
       this.asc = asc;
     }
 
     @Override
-    public int compare(QueryUnitHistory queryUnit, QueryUnitHistory queryUnit2) {
+    public int compare(TaskHistory task1, TaskHistory task2) {
       if(asc) {
         if("id".equals(sortField)) {
-          return queryUnit.getId().compareTo(queryUnit2.getId());
+          return task1.getId().compareTo(task2.getId());
         } else if("host".equals(sortField)) {
-          String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort();
-          String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort();
+          String host1 = task1.getHostAndPort() == null ? "-" : task1.getHostAndPort();
+          String host2 = task2.getHostAndPort() == null ? "-" : task2.getHostAndPort();
           return host1.compareTo(host2);
         } else if("runTime".equals(sortField)) {
-          return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime());
+          return compareLong(task1.getRunningTime(), task2.getRunningTime());
         } else if("startTime".equals(sortField)) {
-          return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime());
+          return compareLong(task1.getLaunchTime(), task2.getLaunchTime());
         } else {
-          return queryUnit.getId().compareTo(queryUnit2.getId());
+          return task1.getId().compareTo(task2.getId());
         }
       } else {
         if("id".equals(sortField)) {
-          return queryUnit2.getId().compareTo(queryUnit.getId());
+          return task2.getId().compareTo(task1.getId());
         } else if("host".equals(sortField)) {
-          String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort();
-          String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort();
+          String host1 = task1.getHostAndPort() == null ? "-" : task1.getHostAndPort();
+          String host2 = task2.getHostAndPort() == null ? "-" : task2.getHostAndPort();
           return host2.compareTo(host1);
         } else if("runTime".equals(sortField)) {
-          if(queryUnit2.getLaunchTime() == 0) {
+          if(task2.getLaunchTime() == 0) {
             return -1;
-          } else if(queryUnit.getLaunchTime() == 0) {
+          } else if(task1.getLaunchTime() == 0) {
             return 1;
           }
-          return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime());
+          return compareLong(task2.getRunningTime(), task1.getRunningTime());
         } else if("startTime".equals(sortField)) {
-          return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime());
+          return compareLong(task2.getLaunchTime(), task1.getLaunchTime());
         } else {
-          return queryUnit2.getId().compareTo(queryUnit.getId());
+          return task2.getId().compareTo(task1.getId());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index 21bc725..9fb427f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -24,11 +24,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
 import org.apache.tajo.master.querymaster.QueryInfo;
-import org.apache.tajo.worker.TaskHistory;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -187,22 +186,21 @@ public class HistoryReader {
     }
   }
 
-  public List<QueryUnitHistory> getQueryUnitHistory(String queryId, String ebId) throws IOException {
+  public List<TaskHistory> getTaskHistory(String queryId, String ebId) throws IOException {
     Path queryHistoryFile = getQueryHistoryFilePath(queryId, 0);
     if (queryHistoryFile == null) {
-      return new ArrayList<QueryUnitHistory>();
+      return new ArrayList<TaskHistory>();
     }
     Path detailFile = new Path(queryHistoryFile.getParent(), ebId + HistoryWriter.HISTORY_FILE_POSTFIX);
     FileSystem fs = HistoryWriter.getNonCrcFileSystem(detailFile, tajoConf);
 
     if (!fs.exists(detailFile)) {
-      return new ArrayList<QueryUnitHistory>();
+      return new ArrayList<TaskHistory>();
     }
 
     FileStatus fileStatus = fs.getFileStatus(detailFile);
     if (fileStatus.getLen() > 100 * 1024 * 1024) {    // 100MB
-      throw new IOException("QueryUnitHistory file is too big: " +
-          detailFile + ", " + fileStatus.getLen() + " bytes");
+      throw new IOException("TaskHistory file is too big: " + detailFile + ", " + fileStatus.getLen() + " bytes");
     }
 
     FSDataInputStream in = null;
@@ -212,7 +210,7 @@ public class HistoryReader {
 
       in.readFully(buf, 0, buf.length);
 
-      return SubQueryHistory.fromJsonQueryUnits(new String(buf));
+      return SubQueryHistory.fromJsonTasks(new String(buf));
     } finally {
       if (in != null) {
         in.close();
@@ -220,7 +218,7 @@ public class HistoryReader {
     }
   }
 
-  public TaskHistory getTaskHistory(String queryUnitAttemptId, long startTime) throws IOException {
+  public org.apache.tajo.worker.TaskHistory getTaskHistory(String taskAttemptId, long startTime) throws IOException {
     FileSystem fs = HistoryWriter.getNonCrcFileSystem(taskHistoryParentPath, tajoConf);
 
     SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
@@ -276,9 +274,9 @@ public class HistoryReader {
 
             builder.clear();
             TaskHistoryProto taskHistoryProto = builder.mergeFrom(buf).build();
-            QueryUnitAttemptId attemptId = new QueryUnitAttemptId(taskHistoryProto.getQueryUnitAttemptId());
-            if (attemptId.toString().equals(queryUnitAttemptId)) {
-              return new TaskHistory(taskHistoryProto);
+            TaskAttemptId attemptId = new TaskAttemptId(taskHistoryProto.getTaskAttemptId());
+            if (attemptId.toString().equals(taskAttemptId)) {
+              return new org.apache.tajo.worker.TaskHistory(taskHistoryProto);
             }
           }
         } catch (EOFException e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index 17c9366..7e30f9c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -238,7 +238,7 @@ public class HistoryWriter extends AbstractService {
               writeTaskHistory((TaskHistory) eachHistory);
             } catch (Exception e) {
               LOG.error("Error while saving task history: " +
-                  ((TaskHistory) eachHistory).getQueryUnitAttemptId() + ":" + e.getMessage(), e);
+                  ((TaskHistory) eachHistory).getTaskAttemptId() + ":" + e.getMessage(), e);
             }
             break;
           case QUERY:
@@ -301,7 +301,7 @@ public class HistoryWriter extends AbstractService {
           out = null;
           try {
             out = fs.create(path);
-            out.write(subQueryHistory.toQueryUnitsJson().getBytes());
+            out.write(subQueryHistory.toTasksJson().getBytes());
             LOG.info("Saving query unit: " + path);
           } finally {
             if (out != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java
deleted file mode 100644
index 556a971..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util.history;
-
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.json.GsonObject;
-
-public class QueryUnitHistory implements GsonObject {
-  @Expose private String id;
-  @Expose private String hostAndPort;
-  @Expose private int httpPort;
-  @Expose private String state;
-  @Expose private float progress;
-  @Expose private long launchTime;
-  @Expose private long finishTime;
-  @Expose private int retryCount;
-
-  @Expose private int numShuffles;
-  @Expose private String shuffleKey;
-  @Expose private String shuffleFileName;
-
-  @Expose private String[] dataLocations;
-  @Expose private String[] fragments;
-  @Expose private String[][] fetchs;
-
-  public String getId() {
-    return id;
-  }
-
-  public void setId(String id) {
-    this.id = id;
-  }
-
-  public String getHostAndPort() {
-    return hostAndPort;
-  }
-
-  public void setHostAndPort(String hostAndPort) {
-    this.hostAndPort = hostAndPort;
-  }
-
-  public int getHttpPort() {
-    return httpPort;
-  }
-
-  public void setHttpPort(int httpPort) {
-    this.httpPort = httpPort;
-  }
-
-  public int getRetryCount() {
-    return retryCount;
-  }
-
-  public void setRetryCount(int retryCount) {
-    this.retryCount = retryCount;
-  }
-
-  public String getState() {
-    return state;
-  }
-
-  public void setState(String state) {
-    this.state = state;
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  public void setProgress(float progress) {
-    this.progress = progress;
-  }
-
-  public long getLaunchTime() {
-    return launchTime;
-  }
-
-  public void setLaunchTime(long launchTime) {
-    this.launchTime = launchTime;
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
-  }
-
-  public int getNumShuffles() {
-    return numShuffles;
-  }
-
-  public void setNumShuffles(int numShuffles) {
-    this.numShuffles = numShuffles;
-  }
-
-  public String getShuffleKey() {
-    return shuffleKey;
-  }
-
-  public void setShuffleKey(String shuffleKey) {
-    this.shuffleKey = shuffleKey;
-  }
-
-  public String getShuffleFileName() {
-    return shuffleFileName;
-  }
-
-  public void setShuffleFileName(String shuffleFileName) {
-    this.shuffleFileName = shuffleFileName;
-  }
-
-  public String[] getDataLocations() {
-    return dataLocations;
-  }
-
-  public void setDataLocations(String[] dataLocations) {
-    this.dataLocations = dataLocations;
-  }
-
-  public String[] getFragments() {
-    return fragments;
-  }
-
-  public void setFragments(String[] fragments) {
-    this.fragments = fragments;
-  }
-
-  public String[][] getFetchs() {
-    return fetchs;
-  }
-
-  public void setFetchs(String[][] fetchs) {
-    this.fetchs = fetchs;
-  }
-
-  @Override
-  public String toJson() {
-    return CoreGsonHelper.toJson(this, QueryUnitHistory.class);
-  }
-
-  public long getRunningTime() {
-    if(finishTime > 0) {
-      return finishTime - launchTime;
-    } else {
-      return 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
index b3ac4d2..0afdf5a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
@@ -67,7 +67,7 @@ public class SubQueryHistory implements GsonObject {
   @Expose
   private int rackLocalAssigned;
 
-  private List<QueryUnitHistory> queryUnits;
+  private List<TaskHistory> tasks;
 
   public String getExecutionBlockId() {
     return executionBlockId;
@@ -213,12 +213,12 @@ public class SubQueryHistory implements GsonObject {
     this.killedObjectCount = killedObjectCount;
   }
 
-  public List<QueryUnitHistory> getQueryUnits() {
-    return queryUnits;
+  public List<TaskHistory> getTasks() {
+    return tasks;
   }
 
-  public void setQueryUnits(List<QueryUnitHistory> queryUnits) {
-    this.queryUnits = queryUnits;
+  public void setTasks(List<TaskHistory> tasks) {
+    this.tasks = tasks;
   }
 
   @Override
@@ -226,19 +226,19 @@ public class SubQueryHistory implements GsonObject {
     return CoreGsonHelper.toJson(this, SubQueryHistory.class);
   }
 
-  public String toQueryUnitsJson() {
-    if (queryUnits == null) {
+  public String toTasksJson() {
+    if (tasks == null) {
       return "";
     }
-    return CoreGsonHelper.getInstance().toJson(queryUnits, new TypeToken<List<QueryUnitHistory>>() {
+    return CoreGsonHelper.getInstance().toJson(tasks, new TypeToken<List<TaskHistory>>() {
     }.getType());
   }
 
-  public static List<QueryUnitHistory> fromJsonQueryUnits(String json) {
+  public static List<TaskHistory> fromJsonTasks(String json) {
     if (json == null || json.trim().isEmpty()) {
-      return new ArrayList<QueryUnitHistory>();
+      return new ArrayList<TaskHistory>();
     }
-    return CoreGsonHelper.getInstance().fromJson(json, new TypeToken<List<QueryUnitHistory>>() {
+    return CoreGsonHelper.getInstance().fromJson(json, new TypeToken<List<TaskHistory>>() {
     }.getType());
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java
new file mode 100644
index 0000000..35c56e1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.json.GsonObject;
+
+public class TaskHistory implements GsonObject {
+  @Expose private String id;
+  @Expose private String hostAndPort;
+  @Expose private int httpPort;
+  @Expose private String state;
+  @Expose private float progress;
+  @Expose private long launchTime;
+  @Expose private long finishTime;
+  @Expose private int retryCount;
+
+  @Expose private int numShuffles;
+  @Expose private String shuffleKey;
+  @Expose private String shuffleFileName;
+
+  @Expose private String[] dataLocations;
+  @Expose private String[] fragments;
+  @Expose private String[][] fetchs;
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getHostAndPort() {
+    return hostAndPort;
+  }
+
+  public void setHostAndPort(String hostAndPort) {
+    this.hostAndPort = hostAndPort;
+  }
+
+  public int getHttpPort() {
+    return httpPort;
+  }
+
+  public void setHttpPort(int httpPort) {
+    this.httpPort = httpPort;
+  }
+
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  public void setRetryCount(int retryCount) {
+    this.retryCount = retryCount;
+  }
+
+  public String getState() {
+    return state;
+  }
+
+  public void setState(String state) {
+    this.state = state;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public long getLaunchTime() {
+    return launchTime;
+  }
+
+  public void setLaunchTime(long launchTime) {
+    this.launchTime = launchTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public int getNumShuffles() {
+    return numShuffles;
+  }
+
+  public void setNumShuffles(int numShuffles) {
+    this.numShuffles = numShuffles;
+  }
+
+  public String getShuffleKey() {
+    return shuffleKey;
+  }
+
+  public void setShuffleKey(String shuffleKey) {
+    this.shuffleKey = shuffleKey;
+  }
+
+  public String getShuffleFileName() {
+    return shuffleFileName;
+  }
+
+  public void setShuffleFileName(String shuffleFileName) {
+    this.shuffleFileName = shuffleFileName;
+  }
+
+  public String[] getDataLocations() {
+    return dataLocations;
+  }
+
+  public void setDataLocations(String[] dataLocations) {
+    this.dataLocations = dataLocations;
+  }
+
+  public String[] getFragments() {
+    return fragments;
+  }
+
+  public void setFragments(String[] fragments) {
+    this.fragments = fragments;
+  }
+
+  public String[][] getFetchs() {
+    return fetchs;
+  }
+
+  public void setFetchs(String[][] fetchs) {
+    this.fetchs = fetchs;
+  }
+
+  @Override
+  public String toJson() {
+    return CoreGsonHelper.toJson(this, TaskHistory.class);
+  }
+
+  public long getRunningTime() {
+    if(finishTime > 0) {
+      return finishTime - launchTime;
+    } else {
+      return 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index f18723f..dd3ee68 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
@@ -92,7 +92,7 @@ public class ExecutionBlockContext {
   private AtomicBoolean stop = new AtomicBoolean();
 
   // It keeps all of the query unit attempts while a TaskRunner is running.
-  private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap();
+  private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap();
 
   private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
 
@@ -242,12 +242,12 @@ public class ExecutionBlockContext {
     return executionBlockId;
   }
 
-  public Map<QueryUnitAttemptId, Task> getTasks() {
+  public Map<TaskAttemptId, Task> getTasks() {
     return tasks;
   }
 
-  public Task getTask(QueryUnitAttemptId queryUnitAttemptId){
-    return tasks.get(queryUnitAttemptId);
+  public Task getTask(TaskAttemptId taskAttemptId){
+    return tasks.get(taskAttemptId);
   }
 
   public void stopTaskRunner(String id){
@@ -258,7 +258,7 @@ public class ExecutionBlockContext {
     return manager.getTaskRunner(taskRunnerId);
   }
 
-  public void addTaskHistory(String taskRunnerId, QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
+  public void addTaskHistory(String taskRunnerId, TaskAttemptId quAttemptId, TaskHistory taskHistory) {
     histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory);
   }