You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/19 13:48:56 UTC

[08/13] tajo git commit: TAJO-324: Rename the prefix 'QueryUnit' to Task.

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
deleted file mode 100644
index 6e0d9fd..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
-import org.apache.tajo.master.querymaster.QueryUnitAttempt;
-import org.apache.tajo.master.container.TajoContainerId;
-
-public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
-  private final QueryUnitAttemptScheduleContext context;
-  private final QueryUnitAttempt queryUnitAttempt;
-
-  public QueryUnitAttemptScheduleEvent(EventType eventType, ExecutionBlockId executionBlockId,
-                                       QueryUnitAttemptScheduleContext context, QueryUnitAttempt queryUnitAttempt) {
-    super(eventType, executionBlockId);
-    this.context = context;
-    this.queryUnitAttempt = queryUnitAttempt;
-  }
-
-  public QueryUnitAttempt getQueryUnitAttempt() {
-    return queryUnitAttempt;
-  }
-
-  public QueryUnitAttemptScheduleContext getContext() {
-    return context;
-  }
-
-  public static class QueryUnitAttemptScheduleContext {
-    private TajoContainerId containerId;
-    private String host;
-    private RpcCallback<QueryUnitRequestProto> callback;
-
-    public QueryUnitAttemptScheduleContext() {
-
-    }
-
-    public QueryUnitAttemptScheduleContext(TajoContainerId containerId,
-                                           String host,
-                                           RpcCallback<QueryUnitRequestProto> callback) {
-      this.containerId = containerId;
-      this.host = host;
-      this.callback = callback;
-    }
-
-    public TajoContainerId getContainerId() {
-      return containerId;
-    }
-
-    public void setContainerId(TajoContainerId containerId) {
-      this.containerId = containerId;
-    }
-
-    public String getHost() {
-      return host;
-    }
-
-    public void setHost(String host) {
-      this.host = host;
-    }
-
-    public RpcCallback<QueryUnitRequestProto> getCallback() {
-      return callback;
-    }
-
-    public void setCallback(RpcCallback<QueryUnitRequestProto> callback) {
-      this.callback = callback;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
index 8003ef3..79b6e2e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
@@ -30,7 +30,7 @@ public enum SubQueryEventType {
   SQ_KILL,
   SQ_LAUNCH,
 
-  // Producer: QueryUnit
+  // Producer: Task
   SQ_TASK_COMPLETED,
   SQ_FAILED,
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
index 0502534..816bc48 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
@@ -18,22 +18,22 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskId;
 import org.apache.tajo.master.TaskState;
 
 /**
  * Event Class: From Task to SubQuery
  */
 public class SubQueryTaskEvent extends SubQueryEvent {
-  private QueryUnitId taskId;
+  private TaskId taskId;
   private TaskState state;
-  public SubQueryTaskEvent(QueryUnitId taskId, TaskState state) {
+  public SubQueryTaskEvent(TaskId taskId, TaskState state) {
     super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED);
     this.taskId = taskId;
     this.state = state;
   }
 
-  public QueryUnitId getTaskId() {
+  public TaskId getTaskId() {
     return this.taskId;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index 3b9edcb..1611370 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.container.TajoContainerId;
 
@@ -26,7 +26,7 @@ public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
   private final TajoContainerId cId;
   private final WorkerConnectionInfo workerConnectionInfo;
 
-  public TaskAttemptAssignedEvent(QueryUnitAttemptId id, TajoContainerId cId,
+  public TaskAttemptAssignedEvent(TaskAttemptId id, TajoContainerId cId,
                                   WorkerConnectionInfo connectionInfo) {
     super(id, TaskAttemptEventType.TA_ASSIGNED);
     this.cId = cId;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
index f2df144..1b84de0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEvent.java
@@ -19,18 +19,18 @@
 package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 
 public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
-  private final QueryUnitAttemptId id;
+  private final TaskAttemptId id;
 
-  public TaskAttemptEvent(QueryUnitAttemptId id,
+  public TaskAttemptEvent(TaskAttemptId id,
                           TaskAttemptEventType taskAttemptEventType) {
     super(taskAttemptEventType);
     this.id = id;
   }
 
-  public QueryUnitAttemptId getTaskAttemptId() {
+  public TaskAttemptId getTaskAttemptId() {
     return this.id;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
index 8f153af..3274ef7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptScheduleEvent.java
@@ -19,13 +19,13 @@
 package org.apache.tajo.master.event;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 
 public class TaskAttemptScheduleEvent extends TaskAttemptEvent {
   private Configuration conf;
 
   public TaskAttemptScheduleEvent(final Configuration conf,
-                                  final QueryUnitAttemptId id,
+                                  final TaskAttemptId id,
                                   final TaskAttemptEventType taskAttemptEventType) {
     super(id, taskAttemptEventType);
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index d980e05..8c5f016 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -18,13 +18,13 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto;
 
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
   private final TaskStatusProto status;
 
-  public TaskAttemptStatusUpdateEvent(final QueryUnitAttemptId id,
+  public TaskAttemptStatusUpdateEvent(final TaskAttemptId id,
                                       TaskStatusProto status) {
     super(id, TaskAttemptEventType.TA_UPDATE);
     this.status = status;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
new file mode 100644
index 0000000..91ef942
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.querymaster.TaskAttempt;
+import org.apache.tajo.master.container.TajoContainerId;
+
+public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent {
+  private final TaskAttemptScheduleContext context;
+  private final TaskAttempt taskAttempt;
+
+  public TaskAttemptToSchedulerEvent(EventType eventType, ExecutionBlockId executionBlockId,
+                                     TaskAttemptScheduleContext context, TaskAttempt taskAttempt) {
+    super(eventType, executionBlockId);
+    this.context = context;
+    this.taskAttempt = taskAttempt;
+  }
+
+  public TaskAttempt getTaskAttempt() {
+    return taskAttempt;
+  }
+
+  public TaskAttemptScheduleContext getContext() {
+    return context;
+  }
+
+  public static class TaskAttemptScheduleContext {
+    private TajoContainerId containerId;
+    private String host;
+    private RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback;
+
+    public TaskAttemptScheduleContext() {
+
+    }
+
+    public TaskAttemptScheduleContext(TajoContainerId containerId,
+                                      String host,
+                                      RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) {
+      this.containerId = containerId;
+      this.host = host;
+      this.callback = callback;
+    }
+
+    public TajoContainerId getContainerId() {
+      return containerId;
+    }
+
+    public void setContainerId(TajoContainerId containerId) {
+      this.containerId = containerId;
+    }
+
+    public String getHost() {
+      return host;
+    }
+
+    public void setHost(String host) {
+      this.host = host;
+    }
+
+    public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() {
+      return callback;
+    }
+
+    public void setCallback(RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) {
+      this.callback = callback;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index 3ee389a..20204aa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -18,14 +18,14 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
 
 public class TaskCompletionEvent extends TaskAttemptEvent {
   private TaskCompletionReport report;
 
   public TaskCompletionEvent(TaskCompletionReport report) {
-    super(new QueryUnitAttemptId(report.getId()), TaskAttemptEventType.TA_DONE);
+    super(new TaskAttemptId(report.getId()), TaskAttemptEventType.TA_DONE);
     this.report = report;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
index 234491b..377a8e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEvent.java
@@ -19,17 +19,17 @@
 package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskId;
 
 public class TaskEvent extends AbstractEvent<TaskEventType> {
-  private final QueryUnitId id;
+  private final TaskId id;
 
-  public TaskEvent(QueryUnitId id, TaskEventType taskEventType) {
+  public TaskEvent(TaskId id, TaskEventType taskEventType) {
     super(taskEventType);
     this.id = id;
   }
 
-  public QueryUnitId getTaskId() {
+  public TaskId getTaskId() {
     return id;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index a4d9900..03888bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -18,19 +18,19 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport;
 
 public class TaskFatalErrorEvent extends TaskAttemptEvent {
   private final String message;
 
   public TaskFatalErrorEvent(TaskFatalErrorReport report) {
-    super(new QueryUnitAttemptId(report.getId()),
+    super(new TaskAttemptId(report.getId()),
         TaskAttemptEventType.TA_FATAL_ERROR);
     this.message = report.getErrorMessage();
   }
 
-  public TaskFatalErrorEvent(QueryUnitAttemptId attemptId, String message) {
+  public TaskFatalErrorEvent(TaskAttemptId attemptId, String message) {
     super(attemptId, TaskAttemptEventType.TA_FATAL_ERROR);
     this.message = message;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 9e8e3dd..3f72ed9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -21,7 +21,8 @@ package org.apache.tajo.master.event;
 import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
 import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
 import org.apache.tajo.master.container.TajoContainerId;
 
@@ -35,12 +36,12 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
   private final TajoContainerId containerId;
   private final ExecutionBlockId executionBlockId;
 
-  private final RpcCallback<QueryUnitRequestProto> callback;
+  private final RpcCallback<TaskRequestProto> callback;
 
   public TaskRequestEvent(int workerId,
                           TajoContainerId containerId,
                           ExecutionBlockId executionBlockId,
-                          RpcCallback<QueryUnitRequestProto> callback) {
+                          RpcCallback<TaskRequestProto> callback) {
     super(TaskRequestEventType.TASK_REQ);
     this.workerId = workerId;
     this.containerId = containerId;
@@ -60,7 +61,7 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
     return executionBlockId;
   }
 
-  public RpcCallback<QueryUnitRequestProto> getCallback() {
+  public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() {
     return this.callback;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
index 28654f0..a4f120c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptEvent.java
@@ -18,17 +18,17 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 
 public class TaskTAttemptEvent extends TaskEvent {
-  private final QueryUnitAttemptId attemptId;
-  public TaskTAttemptEvent(QueryUnitAttemptId attemptId,
+  private final TaskAttemptId attemptId;
+  public TaskTAttemptEvent(TaskAttemptId attemptId,
                            TaskEventType eventType) {
-    super(attemptId.getQueryUnitId(), eventType);
+    super(attemptId.getTaskId(), eventType);
     this.attemptId = attemptId;
   }
 
-  public QueryUnitAttemptId getTaskAttemptId() {
+  public TaskAttemptId getTaskAttemptId() {
     return attemptId;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index f4bd8a3..e7e2bc0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -122,7 +122,7 @@ public class QueryMasterManagerService extends CompositeService
 
   @Override
   public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
-                      RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
+                      RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
     try {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
       QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
@@ -144,15 +144,15 @@ public class QueryMasterManagerService extends CompositeService
   public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
                            RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
-      QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
-      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+      QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
+      TaskAttemptId attemptId = new TaskAttemptId(request.getId());
       QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
       if (queryMasterTask == null) {
         queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
       }
-      SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
-      QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
-      QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
+      SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getTaskId().getExecutionBlockId());
+      Task task = sq.getTask(attemptId.getTaskId());
+      TaskAttempt attempt = task.getAttempt(attemptId.getId());
 
       if(LOG.isDebugEnabled()){
         LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
@@ -161,10 +161,10 @@ public class QueryMasterManagerService extends CompositeService
       if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
         LOG.warn(attemptId + " Killed");
         attempt.handle(
-            new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+            new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
       } else {
         queryMasterTask.getEventHandler().handle(
-            new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+            new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
       }
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
@@ -185,11 +185,11 @@ public class QueryMasterManagerService extends CompositeService
                          RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
       QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
       if (queryMasterTask != null) {
         queryMasterTask.handleTaskFailed(report);
       } else {
-        LOG.warn("No QueryMasterTask: " + new QueryUnitAttemptId(report.getId()));
+        LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
       }
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
@@ -203,7 +203,7 @@ public class QueryMasterManagerService extends CompositeService
                    RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
       QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
       if (queryMasterTask != null) {
         queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 8f63416..9ab4f0a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -285,12 +285,12 @@ public class QueryMasterTask extends CompositeService {
   private class TaskEventDispatcher
       implements EventHandler<TaskEvent> {
     public void handle(TaskEvent event) {
-      QueryUnitId taskId = event.getTaskId();
+      TaskId taskId = event.getTaskId();
       if(LOG.isDebugEnabled()) {
         LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
       }
-      QueryUnit task = query.getSubQuery(taskId.getExecutionBlockId()).
-          getQueryUnit(taskId);
+      Task task = query.getSubQuery(taskId.getExecutionBlockId()).
+          getTask(taskId);
       task.handle(event);
     }
   }
@@ -298,10 +298,10 @@ public class QueryMasterTask extends CompositeService {
   private class TaskAttemptEventDispatcher
       implements EventHandler<TaskAttemptEvent> {
     public void handle(TaskAttemptEvent event) {
-      QueryUnitAttemptId attemptId = event.getTaskAttemptId();
-      SubQuery subQuery = query.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
-      QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
-      QueryUnitAttempt attempt = task.getAttempt(attemptId);
+      TaskAttemptId attemptId = event.getTaskAttemptId();
+      SubQuery subQuery = query.getSubQuery(attemptId.getTaskId().getExecutionBlockId());
+      Task task = subQuery.getTask(attemptId.getTaskId());
+      TaskAttempt attempt = task.getAttempt(attemptId);
       attempt.handle(event);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
deleted file mode 100644
index 75402c2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ /dev/null
@@ -1,907 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
-import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.FragmentPair;
-import org.apache.tajo.master.TaskState;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.DataLocation;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.util.history.QueryUnitHistory;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
-public class QueryUnit implements EventHandler<TaskEvent> {
-  /** Class Logger */
-  private static final Log LOG = LogFactory.getLog(QueryUnit.class);
-
-  private final Configuration systemConf;
-	private QueryUnitId taskId;
-  private EventHandler eventHandler;
-	private StoreTableNode store = null;
-	private LogicalNode plan = null;
-	private List<ScanNode> scan;
-	
-	private Map<String, Set<FragmentProto>> fragMap;
-	private Map<String, Set<FetchImpl>> fetchMap;
-
-  private int totalFragmentNum;
-
-  private List<ShuffleFileOutput> shuffleFileOutputs;
-	private TableStats stats;
-  private final boolean isLeafTask;
-  private List<IntermediateEntry> intermediateData;
-
-  private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
-  private final int maxAttempts = 3;
-  private Integer nextAttempt = -1;
-  private QueryUnitAttemptId lastAttemptId;
-
-  private QueryUnitAttemptId successfulAttempt;
-  private String succeededHost;
-  private int succeededHostPort;
-  private int succeededPullServerPort;
-
-  private int failedAttempts;
-  private int finishedAttempts; // finish are total of success, failed and killed
-
-  private long launchTime;
-  private long finishTime;
-
-  private List<DataLocation> dataLocations = Lists.newArrayList();
-
-  private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
-
-  private QueryUnitHistory finalQueryUnitHistory;
-
-  protected static final StateMachineFactory
-      <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
-      new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
-          // Transitions from NEW state
-          .addTransition(TaskState.NEW, TaskState.SCHEDULED,
-              TaskEventType.T_SCHEDULE,
-              new InitialScheduleTransition())
-          .addTransition(TaskState.NEW, TaskState.KILLED,
-              TaskEventType.T_KILL,
-              new KillNewTaskTransition())
-
-          // Transitions from SCHEDULED state
-          .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
-              TaskEventType.T_ATTEMPT_LAUNCHED,
-              new AttemptLaunchedTransition())
-          .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
-              TaskEventType.T_KILL,
-              new KillTaskTransition())
-
-          // Transitions from RUNNING state
-          .addTransition(TaskState.RUNNING, TaskState.RUNNING,
-              TaskEventType.T_ATTEMPT_LAUNCHED)
-          .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
-              TaskEventType.T_ATTEMPT_SUCCEEDED,
-              new AttemptSucceededTransition())
-          .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
-              TaskEventType.T_KILL,
-              new KillTaskTransition())
-          .addTransition(TaskState.RUNNING,
-              EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
-              TaskEventType.T_ATTEMPT_FAILED,
-              new AttemptFailedOrRetryTransition())
-
-          // Transitions from KILL_WAIT state
-          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
-              TaskEventType.T_ATTEMPT_KILLED,
-              ATTEMPT_KILLED_TRANSITION)
-          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
-              TaskEventType.T_ATTEMPT_LAUNCHED,
-              new KillTaskTransition())
-          .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
-              TaskEventType.T_ATTEMPT_FAILED,
-              new AttemptFailedTransition())
-          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
-              TaskEventType.T_ATTEMPT_SUCCEEDED,
-              ATTEMPT_KILLED_TRANSITION)
-              // Ignore-able transitions.
-          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
-              EnumSet.of(
-                  TaskEventType.T_KILL,
-                  TaskEventType.T_SCHEDULE))
-
-          // Transitions from SUCCEEDED state
-          // Ignore-able transitions
-          .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
-              EnumSet.of(TaskEventType.T_KILL,
-                  TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
-          // Transitions from FAILED state
-          // Ignore-able transitions
-          .addTransition(TaskState.FAILED, TaskState.FAILED,
-              EnumSet.of(TaskEventType.T_KILL,
-                  TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
-          // Transitions from KILLED state
-          .addTransition(TaskState.KILLED, TaskState.KILLED, TaskEventType.T_ATTEMPT_KILLED, new KillTaskTransition())
-          // Ignore-able transitions
-          .addTransition(TaskState.KILLED, TaskState.KILLED,
-              EnumSet.of(
-                  TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
-          .installTopology();
-
-  private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
-
-
-  private final Lock readLock;
-  private final Lock writeLock;
-  private QueryUnitAttemptScheduleContext scheduleContext;
-
-	public QueryUnit(Configuration conf, QueryUnitAttemptScheduleContext scheduleContext,
-                   QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
-    this.systemConf = conf;
-		this.taskId = id;
-    this.eventHandler = eventHandler;
-    this.isLeafTask = isLeafTask;
-		scan = new ArrayList<ScanNode>();
-    fetchMap = Maps.newHashMap();
-    fragMap = Maps.newHashMap();
-    shuffleFileOutputs = new ArrayList<ShuffleFileOutput>();
-    attempts = Collections.emptyMap();
-    lastAttemptId = null;
-    nextAttempt = -1;
-    failedAttempts = 0;
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-    this.scheduleContext = scheduleContext;
-
-    stateMachine = stateMachineFactory.make(this);
-    totalFragmentNum = 0;
-	}
-
-  public boolean isLeafTask() {
-    return this.isLeafTask;
-  }
-
-  public TaskState getState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public TaskAttemptState getLastAttemptStatus() {
-    QueryUnitAttempt lastAttempt = getLastAttempt();
-    if (lastAttempt != null) {
-      return lastAttempt.getState();
-    } else {
-      return TaskAttemptState.TA_ASSIGNED;
-    }
-  }
-
-  public QueryUnitHistory getQueryUnitHistory() {
-    if (finalQueryUnitHistory != null) {
-      if (finalQueryUnitHistory.getFinishTime() == 0) {
-        finalQueryUnitHistory = makeQueryUnitHistory();
-      }
-      return finalQueryUnitHistory;
-    } else {
-      return makeQueryUnitHistory();
-    }
-  }
-
-  private QueryUnitHistory makeQueryUnitHistory() {
-    QueryUnitHistory queryUnitHistory = new QueryUnitHistory();
-
-    QueryUnitAttempt lastAttempt = getLastAttempt();
-    if (lastAttempt != null) {
-      queryUnitHistory.setId(lastAttempt.getId().toString());
-      queryUnitHistory.setState(lastAttempt.getState().toString());
-      queryUnitHistory.setProgress(lastAttempt.getProgress());
-    }
-    queryUnitHistory.setHostAndPort(succeededHost + ":" + succeededHostPort);
-    queryUnitHistory.setRetryCount(this.getRetryCount());
-    queryUnitHistory.setLaunchTime(launchTime);
-    queryUnitHistory.setFinishTime(finishTime);
-
-    queryUnitHistory.setNumShuffles(getShuffleOutpuNum());
-    if (!getShuffleFileOutputs().isEmpty()) {
-      ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0);
-      if (queryUnitHistory.getNumShuffles() > 0) {
-        queryUnitHistory.setShuffleKey("" + shuffleFileOutputs.getPartId());
-        queryUnitHistory.setShuffleFileName(shuffleFileOutputs.getFileName());
-      }
-    }
-
-    List<String> fragmentList = new ArrayList<String>();
-    for (FragmentProto eachFragment : getAllFragments()) {
-      try {
-        Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment);
-        fragmentList.add(fragment.toString());
-      } catch (Exception e) {
-        LOG.error(e.getMessage());
-        fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage());
-      }
-    }
-    queryUnitHistory.setFragments(fragmentList.toArray(new String[]{}));
-
-    List<String[]> fetchList = new ArrayList<String[]>();
-    for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
-      for (FetchImpl f : e.getValue()) {
-        for (URI uri : f.getSimpleURIs()){
-          fetchList.add(new String[] {e.getKey(), uri.toString()});
-        }
-      }
-    }
-
-    queryUnitHistory.setFetchs(fetchList.toArray(new String[][]{}));
-
-    List<String> dataLocationList = new ArrayList<String>();
-    for(DataLocation eachLocation: getDataLocations()) {
-      dataLocationList.add(eachLocation.toString());
-    }
-
-    queryUnitHistory.setDataLocations(dataLocationList.toArray(new String[]{}));
-    return queryUnitHistory;
-  }
-
-	public void setLogicalPlan(LogicalNode plan) {
-	  this.plan = plan;
-
-	  LogicalNode node = plan;
-	  ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
-	  s.add(node);
-	  while (!s.isEmpty()) {
-	    node = s.remove(s.size()-1);
-	    if (node instanceof UnaryNode) {
-	      UnaryNode unary = (UnaryNode) node;
-	      s.add(s.size(), unary.getChild());
-	    } else if (node instanceof BinaryNode) {
-	      BinaryNode binary = (BinaryNode) node;
-	      s.add(s.size(), binary.getLeftChild());
-	      s.add(s.size(), binary.getRightChild());
-	    } else if (node instanceof ScanNode) {
-	      scan.add((ScanNode)node);
-	    } else if (node instanceof TableSubQueryNode) {
-        s.add(((TableSubQueryNode) node).getSubQuery());
-      }
-	  }
-	}
-
-  private void addDataLocation(Fragment fragment) {
-    String[] hosts = fragment.getHosts();
-    int[] diskIds = null;
-    if (fragment instanceof FileFragment) {
-      diskIds = ((FileFragment)fragment).getDiskIds();
-    }
-    for (int i = 0; i < hosts.length; i++) {
-      dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
-    }
-  }
-
-  public void addFragment(Fragment fragment, boolean useDataLocation) {
-    Set<FragmentProto> fragmentProtos;
-    if (fragMap.containsKey(fragment.getTableName())) {
-      fragmentProtos = fragMap.get(fragment.getTableName());
-    } else {
-      fragmentProtos = new HashSet<FragmentProto>();
-      fragMap.put(fragment.getTableName(), fragmentProtos);
-    }
-    fragmentProtos.add(fragment.getProto());
-    if (useDataLocation) {
-      addDataLocation(fragment);
-    }
-    totalFragmentNum++;
-  }
-
-  public void addFragments(Collection<Fragment> fragments) {
-    for (Fragment eachFragment: fragments) {
-      addFragment(eachFragment, false);
-    }
-  }
-
-  public void setFragment(FragmentPair[] fragmentPairs) {
-    for (FragmentPair eachFragmentPair : fragmentPairs) {
-      this.addFragment(eachFragmentPair.getLeftFragment(), true);
-      if (eachFragmentPair.getRightFragment() != null) {
-        this.addFragment(eachFragmentPair.getRightFragment(), true);
-      }
-    }
-  }
-
-  public List<DataLocation> getDataLocations() {
-    return dataLocations;
-  }
-
-  public String getSucceededHost() {
-    return succeededHost;
-  }
-	
-	public void addFetches(String tableId, Collection<FetchImpl> fetches) {
-	  Set<FetchImpl> fetchSet;
-    if (fetchMap.containsKey(tableId)) {
-      fetchSet = fetchMap.get(tableId);
-    } else {
-      fetchSet = Sets.newHashSet();
-    }
-    fetchSet.addAll(fetches);
-    fetchMap.put(tableId, fetchSet);
-	}
-	
-	public void setFetches(Map<String, Set<FetchImpl>> fetches) {
-	  this.fetchMap.clear();
-	  this.fetchMap.putAll(fetches);
-	}
-
-  public Collection<FragmentProto> getAllFragments() {
-    Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>();
-    for (Set<FragmentProto> eachFragmentSet : fragMap.values()) {
-      fragmentProtos.addAll(eachFragmentSet);
-    }
-    return fragmentProtos;
-  }
-	
-	public LogicalNode getLogicalPlan() {
-	  return this.plan;
-	}
-	
-	public QueryUnitId getId() {
-		return taskId;
-	}
-	
-	public Collection<FetchImpl> getFetchHosts(String tableId) {
-	  return fetchMap.get(tableId);
-	}
-	
-	public Collection<Set<FetchImpl>> getFetches() {
-	  return fetchMap.values();
-	}
-
-  public Map<String, Set<FetchImpl>> getFetchMap() {
-    return fetchMap;
-  }
-	
-	public Collection<FetchImpl> getFetch(ScanNode scan) {
-	  return this.fetchMap.get(scan.getTableName());
-	}
-	
-	public ScanNode[] getScanNodes() {
-	  return this.scan.toArray(new ScanNode[scan.size()]);
-	}
-	
-	@Override
-	public String toString() {
-    StringBuilder builder = new StringBuilder();
-    builder.append(plan.getType() + " \n");
-		for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
-		  builder.append(e.getKey()).append(" : ");
-      for (FragmentProto fragment : e.getValue()) {
-        builder.append(fragment).append(", ");
-      }
-		}
-		for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
-      builder.append(e.getKey()).append(" : ");
-      for (FetchImpl t : e.getValue()) {
-        for (URI uri : t.getURIs()){
-          builder.append(uri).append(" ");
-        }
-      }
-    }
-		
-		return builder.toString();
-	}
-	
-	public void setStats(TableStats stats) {
-	  this.stats = stats;
-	}
-	
-	public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) {
-	  this.shuffleFileOutputs = Collections.unmodifiableList(partitions);
-	}
-	
-	public TableStats getStats() {
-	  return this.stats;
-	}
-	
-	public List<ShuffleFileOutput> getShuffleFileOutputs() {
-	  return this.shuffleFileOutputs;
-	}
-	
-	public int getShuffleOutpuNum() {
-	  return this.shuffleFileOutputs.size();
-	}
-
-  public QueryUnitAttempt newAttempt() {
-    QueryUnitAttempt attempt = new QueryUnitAttempt(scheduleContext,
-        QueryIdFactory.newQueryUnitAttemptId(this.getId(), ++nextAttempt),
-        this, eventHandler);
-    lastAttemptId = attempt.getId();
-    return attempt;
-  }
-
-  public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
-    return attempts.get(attemptId);
-  }
-
-  public QueryUnitAttempt getAttempt(int attempt) {
-    return this.attempts.get(QueryIdFactory.newQueryUnitAttemptId(this.getId(), attempt));
-  }
-
-  public QueryUnitAttempt getLastAttempt() {
-    return getAttempt(this.lastAttemptId);
-  }
-
-  public QueryUnitAttempt getSuccessfulAttempt() {
-    readLock.lock();
-    try {
-      if (null == successfulAttempt) {
-        return null;
-      }
-      return attempts.get(successfulAttempt);
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public int getRetryCount () {
-    return this.nextAttempt;
-  }
-
-  public int getTotalFragmentNum() {
-    return totalFragmentNum;
-  }
-
-  private static class InitialScheduleTransition implements
-    SingleArcTransition<QueryUnit, TaskEvent> {
-
-    @Override
-    public void transition(QueryUnit task, TaskEvent taskEvent) {
-      task.addAndScheduleAttempt();
-    }
-  }
-
-  public long getLaunchTime() {
-    return launchTime;
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  @VisibleForTesting
-  public void setLaunchTime(long launchTime) {
-    this.launchTime = launchTime;
-  }
-
-  @VisibleForTesting
-  public void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
-  }
-
-  public long getRunningTime() {
-    if(finishTime > 0) {
-      return finishTime - launchTime;
-    } else {
-      return System.currentTimeMillis() - launchTime;
-    }
-  }
-
-  // This is always called in the Write Lock
-  private void addAndScheduleAttempt() {
-    // Create new task attempt
-    QueryUnitAttempt attempt = newAttempt();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created attempt " + attempt.getId());
-    }
-    switch (attempts.size()) {
-      case 0:
-        attempts = Collections.singletonMap(attempt.getId(), attempt);
-        break;
-
-      case 1:
-        Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
-            = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
-        newAttempts.putAll(attempts);
-        attempts = newAttempts;
-        attempts.put(attempt.getId(), attempt);
-        break;
-
-      default:
-        attempts.put(attempt.getId(), attempt);
-        break;
-    }
-
-    if (failedAttempts > 0) {
-      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
-          TaskAttemptEventType.TA_RESCHEDULE));
-    } else {
-      eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
-          TaskAttemptEventType.TA_SCHEDULE));
-    }
-  }
-
-  private void finishTask() {
-    this.finishTime = System.currentTimeMillis();
-    finalQueryUnitHistory = makeQueryUnitHistory();
-  }
-
-  private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
-
-    @Override
-    public void transition(QueryUnit task, TaskEvent taskEvent) {
-      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
-    }
-  }
-
-  private static class KillTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
-
-    @Override
-    public void transition(QueryUnit task, TaskEvent taskEvent) {
-      task.finishTask();
-      task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
-    }
-  }
-
-  private static class AttemptKilledTransition implements SingleArcTransition<QueryUnit, TaskEvent>{
-
-    @Override
-    public void transition(QueryUnit task, TaskEvent event) {
-      task.finishTask();
-      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
-    }
-  }
-
-  private static class AttemptSucceededTransition
-      implements SingleArcTransition<QueryUnit, TaskEvent>{
-
-    @Override
-    public void transition(QueryUnit task,
-                           TaskEvent event) {
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
-
-      task.successfulAttempt = attemptEvent.getTaskAttemptId();
-      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
-      task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
-      task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
-
-      task.finishTask();
-      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
-    }
-  }
-
-  private static class AttemptLaunchedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
-    @Override
-    public void transition(QueryUnit task,
-                           TaskEvent event) {
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
-      task.launchTime = System.currentTimeMillis();
-      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
-      task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
-    }
-  }
-
-  private static class AttemptFailedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
-    @Override
-    public void transition(QueryUnit task, TaskEvent event) {
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      LOG.info("=============================================================");
-      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
-      LOG.info("=============================================================");
-      task.failedAttempts++;
-      task.finishedAttempts++;
-
-      task.finishTask();
-      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
-    }
-  }
-
-  private static class AttemptFailedOrRetryTransition implements
-    MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
-
-    @Override
-    public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
-      task.failedAttempts++;
-      task.finishedAttempts++;
-      boolean retry = task.failedAttempts < task.maxAttempts;
-
-      LOG.info("====================================================================================");
-      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " +
-          "retry:" + retry + ", attempts:" +  task.failedAttempts + " <<<");
-      LOG.info("====================================================================================");
-
-      if (retry) {
-        if (task.successfulAttempt == null) {
-          task.addAndScheduleAttempt();
-        }
-      } else {
-        task.finishTask();
-        task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
-        return TaskState.FAILED;
-      }
-
-      return task.getState();
-    }
-  }
-
-  @Override
-  public void handle(TaskEvent event) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getTaskId() + " of type "
-          + event.getType());
-    }
-
-    try {
-      writeLock.lock();
-      TaskState oldState = getState();
-      try {
-        stateMachine.doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state"
-            + ", eventType:" + event.getType().name()
-            + ", oldState:" + oldState.name()
-            + ", nextState:" + getState().name()
-            , e);
-        eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
-            QueryEventType.INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (LOG.isDebugEnabled()) {
-        if (oldState != getState()) {
-          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
-              + getState());
-        }
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-
-  public void setIntermediateData(Collection<IntermediateEntry> partitions) {
-    this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
-  }
-
-  public List<IntermediateEntry> getIntermediateData() {
-    return this.intermediateData;
-  }
-
-  public static class PullHost implements Cloneable {
-    String host;
-    int port;
-    int hashCode;
-
-    public PullHost(String pullServerAddr, int pullServerPort){
-      this.host = pullServerAddr;
-      this.port = pullServerPort;
-      this.hashCode = Objects.hashCode(host, port);
-    }
-    public String getHost() {
-      return host;
-    }
-
-    public int getPort() {
-      return this.port;
-    }
-
-    public String getPullAddress() {
-      return host + ":" + port;
-    }
-
-    @Override
-    public int hashCode() {
-      return hashCode;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof PullHost) {
-        PullHost other = (PullHost) obj;
-        return host.equals(other.host) && port == other.port;
-      }
-
-      return false;
-    }
-
-    @Override
-    public PullHost clone() throws CloneNotSupportedException {
-      PullHost newPullHost = (PullHost) super.clone();
-      newPullHost.host = host;
-      newPullHost.port = port;
-      newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port);
-      return newPullHost;
-    }
-
-    @Override
-    public String toString() {
-      return host + ":" + port;
-    }
-  }
-
-  public static class IntermediateEntry {
-    ExecutionBlockId ebId;
-    int taskId;
-    int attemptId;
-    int partId;
-    PullHost host;
-    long volume;
-    List<Pair<Long, Integer>> pages;
-    List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
-
-    public IntermediateEntry(IntermediateEntryProto proto) {
-      this.ebId = new ExecutionBlockId(proto.getEbId());
-      this.taskId = proto.getTaskId();
-      this.attemptId = proto.getAttemptId();
-      this.partId = proto.getPartId();
-
-      String[] pullHost = proto.getHost().split(":");
-      this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
-      this.volume = proto.getVolume();
-
-      failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-      for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
-
-        failureRowNums.add(new Pair(eachFailure.getPagePos(),
-            new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
-      }
-
-      pages = new ArrayList<Pair<Long, Integer>>();
-      for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
-        pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
-      }
-    }
-
-    public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
-      this.taskId = taskId;
-      this.attemptId = attemptId;
-      this.partId = partId;
-      this.host = host;
-    }
-
-    public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) {
-      this.taskId = taskId;
-      this.attemptId = attemptId;
-      this.partId = partId;
-      this.host = host;
-      this.volume = volume;
-    }
-
-    public ExecutionBlockId getEbId() {
-      return ebId;
-    }
-
-    public void setEbId(ExecutionBlockId ebId) {
-      this.ebId = ebId;
-    }
-
-    public int getTaskId() {
-      return this.taskId;
-    }
-
-    public int getAttemptId() {
-      return this.attemptId;
-    }
-
-    public int getPartId() {
-      return this.partId;
-    }
-
-    public PullHost getPullHost() {
-      return this.host;
-    }
-
-    public long getVolume() {
-      return this.volume;
-    }
-
-    public long setVolume(long volume) {
-      return this.volume = volume;
-    }
-
-    public List<Pair<Long, Integer>> getPages() {
-      return pages;
-    }
-
-    public void setPages(List<Pair<Long, Integer>> pages) {
-      this.pages = pages;
-    }
-
-    public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
-      return failureRowNums;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(ebId, taskId, partId, attemptId, host);
-    }
-
-    public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
-      List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
-
-      if (pages == null || pages.isEmpty()) {
-        return splits;
-      }
-      int pageSize = pages.size();
-
-      long currentOffset = -1;
-      long currentBytes = 0;
-
-      long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
-      for (int i = 0; i < pageSize; i++) {
-        Pair<Long, Integer> eachPage = pages.get(i);
-        if (currentOffset == -1) {
-          currentOffset = eachPage.getFirst();
-        }
-        if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
-          splits.add(new Pair(currentOffset, currentBytes));
-          currentOffset = eachPage.getFirst();
-          currentBytes = 0;
-          realSplitVolume = splitVolume;
-        }
-
-        currentBytes += eachPage.getSecond();
-      }
-
-      //add last
-      if (currentBytes > 0) {
-        splits.add(new Pair(currentOffset, currentBytes));
-      }
-      return splits;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
deleted file mode 100644
index d88173f..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.master.querymaster.QueryUnit.PullHost;
-import org.apache.tajo.master.container.TajoContainerId;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
-public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
-
-  private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
-
-  private final static int EXPIRE_TIME = 15000;
-
-  private final QueryUnitAttemptId id;
-  private final QueryUnit queryUnit;
-  final EventHandler eventHandler;
-
-  private TajoContainerId containerId;
-  private WorkerConnectionInfo workerConnectionInfo;
-  private int expire;
-
-  private final Lock readLock;
-  private final Lock writeLock;
-
-  private final List<String> diagnostics = new ArrayList<String>();
-
-  private final QueryUnitAttemptScheduleContext scheduleContext;
-
-  private float progress;
-  private CatalogProtos.TableStatsProto inputStats;
-  private CatalogProtos.TableStatsProto resultStats;
-
-  protected static final StateMachineFactory
-      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-      stateMachineFactory = new StateMachineFactory
-      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-      (TaskAttemptState.TA_NEW)
-
-      // Transitions from TA_NEW state
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
-          TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
-          TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_KILL,
-          new TaskKilledCompleteTransition())
-
-      // Transitions from TA_UNASSIGNED state
-      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED,
-          new LaunchTransition())
-      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_KILL,
-          new KillUnassignedTaskTransition())
-
-      // Transitions from TA_ASSIGNED state
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_KILL,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_KILL,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED,
-          EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
-          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new SucceededTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
-      // Transitions from TA_RUNNING state
-      .addTransition(TaskAttemptState.TA_RUNNING,
-          EnumSet.of(TaskAttemptState.TA_RUNNING),
-          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_KILL,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new SucceededTransition())
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_LOCAL_KILLED,
-          new TaskKilledCompleteTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
-          TaskAttemptEventType.TA_ASSIGNED,
-          new KillTaskTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_SCHEDULE_CANCELED,
-          new TaskKilledCompleteTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_DONE,
-          new TaskKilledCompleteTransition())
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR)
-      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
-          EnumSet.of(
-              TaskAttemptEventType.TA_KILL,
-              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-              TaskAttemptEventType.TA_UPDATE))
-
-      // Transitions from TA_SUCCEEDED state
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_UPDATE)
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-       // Ignore-able transitions
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_KILL)
-
-      // Transitions from TA_KILLED state
-      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
-          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
-      // Ignore-able transitions
-      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
-          EnumSet.of(
-              TaskAttemptEventType.TA_UPDATE))
-      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
-          EnumSet.of(
-              TaskAttemptEventType.TA_LOCAL_KILLED,
-              TaskAttemptEventType.TA_KILL,
-              TaskAttemptEventType.TA_ASSIGNED,
-              TaskAttemptEventType.TA_DONE),
-          new TaskKilledCompleteTransition())
-      .installTopology();
-
-  private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-    stateMachine;
-
-
-  public QueryUnitAttempt(final QueryUnitAttemptScheduleContext scheduleContext,
-                          final QueryUnitAttemptId id, final QueryUnit queryUnit,
-                          final EventHandler eventHandler) {
-    this.scheduleContext = scheduleContext;
-    this.id = id;
-    this.expire = QueryUnitAttempt.EXPIRE_TIME;
-    this.queryUnit = queryUnit;
-    this.eventHandler = eventHandler;
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-
-    stateMachine = stateMachineFactory.make(this);
-  }
-
-  public TaskAttemptState getState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public QueryUnitAttemptId getId() {
-    return this.id;
-  }
-
-  public boolean isLeafTask() {
-    return this.queryUnit.isLeafTask();
-  }
-
-  public QueryUnit getQueryUnit() {
-    return this.queryUnit;
-  }
-
-  public WorkerConnectionInfo getWorkerConnectionInfo() {
-    return this.workerConnectionInfo;
-  }
-
-  public void setContainerId(TajoContainerId containerId) {
-    this.containerId = containerId;
-  }
-
-  public synchronized void setExpireTime(int expire) {
-    this.expire = expire;
-  }
-
-  public synchronized void updateExpireTime(int period) {
-    this.setExpireTime(this.expire - period);
-  }
-
-  public synchronized void resetExpireTime() {
-    this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME);
-  }
-
-  public int getLeftTime() {
-    return this.expire;
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  public TableStats getInputStats() {
-    if (inputStats == null) {
-      return null;
-    }
-
-    return new TableStats(inputStats);
-  }
-
-  public TableStats getResultStats() {
-    if (resultStats == null) {
-      return null;
-    }
-    return new TableStats(resultStats);
-  }
-
-  private void fillTaskStatistics(TaskCompletionReport report) {
-    this.progress = 1.0f;
-
-    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-
-    if (report.getShuffleFileOutputsCount() > 0) {
-      this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
-
-      PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
-      for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
-        IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
-            getId().getId(), p.getPartId(), host, p.getVolume());
-        partitions.add(entry);
-      }
-    }
-    this.getQueryUnit().setIntermediateData(partitions);
-
-    if (report.hasInputStats()) {
-      this.inputStats = report.getInputStats();
-    }
-    if (report.hasResultStats()) {
-      this.resultStats = report.getResultStats();
-      this.getQueryUnit().setStats(new TableStats(resultStats));
-    }
-  }
-
-  private static class TaskAttemptScheduleTransition implements
-      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
-      taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
-          EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
-          taskAttempt.scheduleContext, taskAttempt));
-    }
-  }
-
-  private static class KillUnassignedTaskTransition implements
-      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
-      taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
-          EventType.T_SCHEDULE_CANCEL, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
-          taskAttempt.scheduleContext, taskAttempt));
-    }
-  }
-
-  private static class LaunchTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
-      taskAttempt.containerId = castEvent.getContainerId();
-      taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
-      taskAttempt.eventHandler.handle(
-          new TaskTAttemptEvent(taskAttempt.getId(),
-              TaskEventType.T_ATTEMPT_LAUNCHED));
-    }
-  }
-
-  private static class TaskKilledCompleteTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      taskAttempt.getQueryUnit().handle(new TaskEvent(taskAttempt.getId().getQueryUnitId(),
-          TaskEventType.T_ATTEMPT_KILLED));
-      LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
-    }
-  }
-
-  private static class StatusUpdateTransition
-      implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
-
-    @Override
-    public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
-                                       TaskAttemptEvent event) {
-      TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
-
-      taskAttempt.progress = updateEvent.getStatus().getProgress();
-      taskAttempt.inputStats = updateEvent.getStatus().getInputStats();
-      taskAttempt.resultStats = updateEvent.getStatus().getResultStats();
-
-      return TaskAttemptState.TA_RUNNING;
-    }
-  }
-
-  private void addDiagnosticInfo(String diag) {
-    if (diag != null && !diag.equals("")) {
-      diagnostics.add(diag);
-    }
-  }
-
-  private static class AlreadyAssignedTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
-    @Override
-    public void transition(QueryUnitAttempt queryUnitAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
-    }
-  }
-
-  private static class AlreadyDoneTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
-    @Override
-    public void transition(QueryUnitAttempt queryUnitAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
-    }
-  }
-
-  private static class SucceededTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
-
-      try {
-        taskAttempt.fillTaskStatistics(report);
-        taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
-      } catch (Throwable t) {
-        taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
-        taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
-      }
-    }
-  }
-
-  private static class KillTaskTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
-      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
-          LocalTaskEventType.KILL));
-    }
-  }
-
-  private static class FailedTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
-      TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
-      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
-      LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
-          + " >> " + errorEvent.errorMessage());
-    }
-  }
-
-  @Override
-  public void handle(TaskAttemptEvent event) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
-    }
-    try {
-      writeLock.lock();
-      TaskAttemptState oldState = getState();
-      try {
-        stateMachine.doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")"
-            + ", eventType:" + event.getType().name()
-            + ", oldState:" + oldState.name()
-            + ", nextState:" + getState().name()
-            , e);
-        eventHandler.handle(
-            new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
-                "Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
-        eventHandler.handle(
-            new SubQueryEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
-                SubQueryEventType.SQ_INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (LOG.isDebugEnabled()) {
-       if (oldState != getState()) {
-          LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
-              + getState());
-        }
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index a240ace..cf6b917 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -44,7 +44,7 @@ import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.master.TaskSchedulerContext;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;
@@ -717,7 +717,7 @@ public class Repartitioner {
     List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
     for (ExecutionBlock childBlock : childBlocks) {
       SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
-      for (QueryUnit qu : childExecSM.getQueryUnits()) {
+      for (Task qu : childExecSM.getTasks()) {
         for (IntermediateEntry p : qu.getIntermediateData()) {
           FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
           fetch.addPart(p.getTaskId(), p.getAttemptId());
@@ -840,8 +840,8 @@ public class Repartitioner {
       // make FetchImpl per PullServer, PartId
       Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
       for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
-        Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
-        for (Entry<QueryUnit.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
+        Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
+        for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
 
           FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
               block.getId(), interm.getKey(), e.getValue());
@@ -1191,10 +1191,10 @@ public class Repartitioner {
     return hashed;
   }
 
-  public static Map<QueryUnit.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
-    Map<QueryUnit.PullHost, List<IntermediateEntry>> hashed = new HashMap<QueryUnit.PullHost, List<IntermediateEntry>>();
+  public static Map<Task.PullHost, List<IntermediateEntry>> hashByHost(List<IntermediateEntry> entries) {
+    Map<Task.PullHost, List<IntermediateEntry>> hashed = new HashMap<Task.PullHost, List<IntermediateEntry>>();
 
-    QueryUnit.PullHost host;
+    Task.PullHost host;
     for (IntermediateEntry entry : entries) {
       host = entry.getPullHost();
       if (hashed.containsKey(host)) {