You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/06/08 09:53:15 UTC

[1/3] tajo git commit: TAJO-1615: Implement TaskManager. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master dfcf41d6f -> 36da0dac7


http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
index 5c97ba8..16d32d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -36,6 +36,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
 /**
  * The history class for TaskRunner processing.
  */
+@Deprecated
 public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
 
   private Service.STATE state;

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 734a8a5..d18a262 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -36,6 +36,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+@Deprecated
 public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> {
   private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
 
@@ -154,14 +155,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
 
       if(context == null){
         try {
-          context = new ExecutionBlockContext(getTajoConf(),
-              getWorkerContext(),
-              this,
-              startEvent.getQueryContext(),
-              startEvent.getPlan(),
-              startEvent.getExecutionBlockId(),
-              startEvent.getQueryMaster(),
-              startEvent.getShuffleType());
+          context = new ExecutionBlockContext(getWorkerContext(), this, startEvent.getRequest());
           context.init();
         } catch (Throwable e) {
           LOG.fatal(e.getMessage(), e);
@@ -170,7 +164,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
         executionBlockContextMap.put(event.getExecutionBlockId(), context);
       }
 
-      TaskRunner taskRunner = new TaskRunner(context, startEvent.getContainerId());
+      TaskRunner taskRunner = new TaskRunner(context, startEvent.getRequest().getContainerId());
       LOG.info("Start TaskRunner:" + taskRunner.getId());
       taskRunnerMap.put(taskRunner.getId(), taskRunner);
       taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
new file mode 100644
index 0000000..85d74e2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+public class ExecutionBlockStartEvent extends TaskManagerEvent {
+  private TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto;
+
+  public ExecutionBlockStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto) {
+    super(EventType.EB_START, new ExecutionBlockId(requestProto.getExecutionBlockId()));
+    this.requestProto = requestProto;
+  }
+
+  public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequestProto() {
+    return requestProto;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
new file mode 100644
index 0000000..2b967ab
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+public class ExecutionBlockStopEvent extends TaskManagerEvent {
+  private TajoWorkerProtocol.ExecutionBlockListProto cleanupList;
+
+  public ExecutionBlockStopEvent(TajoIdProtos.ExecutionBlockIdProto executionBlockId,
+                                 TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+    super(EventType.EB_STOP, new ExecutionBlockId(executionBlockId));
+    this.cleanupList = cleanupList;
+  }
+
+  public TajoWorkerProtocol.ExecutionBlockListProto getCleanupList() {
+    return cleanupList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
index 2f411e8..9a1c106 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
@@ -24,7 +24,7 @@ import com.google.protobuf.RpcCallback;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto;
 
-public class NodeResourceAllocateEvent extends NodeResourceManagerEvent {
+public class NodeResourceAllocateEvent extends NodeResourceEvent {
 
   private BatchAllocationRequestProto request;
   private RpcCallback<BatchAllocationResponseProto> callback;

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
index a298d77..31d9229 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
@@ -21,7 +21,7 @@ package org.apache.tajo.worker.event;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.resource.NodeResource;
 
-public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent {
+public class NodeResourceDeallocateEvent extends NodeResourceEvent {
 
   private NodeResource resource;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
new file mode 100644
index 0000000..6fd2e0d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NodeResourceEvent extends AbstractEvent<NodeResourceEvent.EventType> {
+  //consumer: NodeResourceManager
+  public enum EventType {
+    // producer: TajoWorkerManagerService
+    ALLOCATE,
+    // producer: TaskExecutor
+    DEALLOCATE
+  }
+
+  public NodeResourceEvent(EventType eventType) {
+    super(eventType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
deleted file mode 100644
index bcb3448..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.resource.NodeResource;
-
-public class NodeResourceManagerEvent extends AbstractEvent<NodeResourceManagerEvent.EventType> {
-  public enum EventType {
-    ALLOCATE,
-    DEALLOCATE
-  }
-
-  public NodeResourceManagerEvent(EventType eventType) {
-    super(eventType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
index 58ab74a..9eb8ae9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
@@ -22,19 +22,16 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.resource.NodeResource;
 
 public class NodeStatusEvent extends AbstractEvent<NodeStatusEvent.EventType> {
-  private final NodeResource resource;
 
+  // consumer: NodeStatusUpdater
   public enum EventType {
+    // producer: NodeResourceManager
     REPORT_RESOURCE,
+    // producer: TaskManager
     FLUSH_REPORTS
   }
 
-  public NodeStatusEvent(EventType eventType, NodeResource resource) {
+  public NodeStatusEvent(EventType eventType) {
     super(eventType);
-    this.resource = resource;
-  }
-
-  public NodeResource getResource() {
-    return resource;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
new file mode 100644
index 0000000..c609c67
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.TaskAttemptId;
+
+public class TaskExecutorEvent extends AbstractEvent<TaskExecutorEvent.EventType> {
+
+  // producer: NodeResourceManager, consumer: TaskExecutorEvent
+  public enum EventType {
+    START,
+    KILL,
+    ABORT
+  }
+
+  private TaskAttemptId taskAttemptId;
+
+  public TaskExecutorEvent(EventType eventType,
+                           TaskAttemptId taskAttemptId) {
+    super(eventType);
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  public TaskAttemptId getTaskId() {
+    return taskAttemptId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
new file mode 100644
index 0000000..39b541b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TaskAttemptId;
+
+public class TaskManagerEvent extends AbstractEvent<TaskManagerEvent.EventType> {
+  // producer: NodeResourceManager, consumer: TaskManager
+  public enum EventType {
+    EB_START,
+    EB_STOP
+  }
+
+  private ExecutionBlockId executionBlockId;
+
+  public TaskManagerEvent(EventType eventType,
+                          ExecutionBlockId executionBlockId) {
+    super(eventType);
+    this.executionBlockId = executionBlockId;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
index aac8973..7175251 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker.event;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.ExecutionBlockId;
 
+@Deprecated
 public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> {
   public enum EventType {
     START,

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
index 908afa2..9406794 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -20,48 +20,20 @@ package org.apache.tajo.worker.event;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
-
+@Deprecated
 public class TaskRunnerStartEvent extends TaskRunnerEvent {
 
-  private final QueryContext queryContext;
-  private final WorkerConnectionInfo queryMaster;
-  private final String containerId;
-  private final String plan;
-  private final PlanProto.ShuffleType shuffleType;
-
-  public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster,
-                              ExecutionBlockId executionBlockId,
-                              String containerId,
-                              QueryContext context,
-                              String plan,
-                              PlanProto.ShuffleType shuffleType) {
-    super(EventType.START, executionBlockId);
-    this.queryMaster = queryMaster;
-    this.containerId = containerId;
-    this.queryContext = context;
-    this.plan = plan;
-    this.shuffleType = shuffleType;
-  }
-
-  public WorkerConnectionInfo getQueryMaster() {
-    return queryMaster;
-  }
-
-  public String getContainerId() {
-    return containerId;
-  }
-
-  public QueryContext getQueryContext() {
-    return queryContext;
-  }
+  private final TajoWorkerProtocol.RunExecutionBlockRequestProto request;
 
-  public String getPlan() {
-    return plan;
+  public TaskRunnerStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+    super(EventType.START, new ExecutionBlockId(request.getExecutionBlockId()));
+    this.request = request;
   }
 
-  public PlanProto.ShuffleType getShuffleType() {
-    return shuffleType;
+  public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequest() {
+    return request;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
index c8ec20d..297f30c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
@@ -20,6 +20,7 @@ package org.apache.tajo.worker.event;
 
 import org.apache.tajo.ExecutionBlockId;
 
+@Deprecated
 public class TaskRunnerStopEvent extends TaskRunnerEvent {
 
   public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
new file mode 100644
index 0000000..f60e7c4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.resource.NodeResource;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
+
+public class TaskStartEvent extends TaskExecutorEvent {
+
+  private NodeResource allocatedResource;
+  private TaskRequestProto taskRequest;
+
+  public TaskStartEvent(TaskRequestProto taskRequest,
+                        NodeResource allocatedResource) {
+    super(EventType.START, new TaskAttemptId(taskRequest.getId()));
+    this.taskRequest = taskRequest;
+    this.allocatedResource = allocatedResource;
+  }
+
+  public NodeResource getAllocatedResource() {
+    return allocatedResource;
+  }
+
+  public TaskRequestProto getTaskRequest() {
+    return taskRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 2324596..715b1e6 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -208,6 +208,7 @@ message TaskAllocationRequestProto {
 message BatchAllocationRequestProto {
     required ExecutionBlockIdProto executionBlockId = 1;
     repeated TaskAllocationRequestProto taskRequest = 2;
+    optional RunExecutionBlockRequestProto executionBlockRequest = 3;  //TODO should be refactored
 }
 
 message BatchAllocationResponseProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index eca7f6d..0cec3da 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.querymaster;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.tajo.*;
@@ -33,16 +34,25 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.session.Session;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.worker.ExecutionBlockContext;
-import org.apache.tajo.worker.Task;
+import org.apache.tajo.worker.LegacyTaskImpl;
+import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.worker.TaskRunnerManager;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -230,7 +240,7 @@ public class TestKillQuery {
     QueryId qid = LocalTajoTestingUtility.newQueryId();
     ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1);
     TaskId tid = QueryIdFactory.newTaskId(eid);
-    TajoConf conf = new TajoConf();
+    final TajoConf conf = new TajoConf();
     TaskRequestImpl taskRequest = new TaskRequestImpl();
 
     taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(),
@@ -238,18 +248,37 @@ public class TestKillQuery {
     taskRequest.setInterQuery();
     TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
 
+    WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        requestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+
+    requestProto.setExecutionBlockId(eid.getProto())
+        .setQueryMaster(queryMaster.getProto())
+        .setNodeId(queryMaster.getHost()+":" + queryMaster.getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    TajoWorker.WorkerContext workerContext = new MockWorkerContext() {
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+    };
+
     ExecutionBlockContext context =
-        new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null, null);
+        new ExecutionBlockContext(workerContext, null, requestProto.build());
 
-    org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId,
+    org.apache.tajo.worker.Task task = new LegacyTaskImpl("test", CommonTestingUtil.getTestDir(), attemptId,
         conf, context, taskRequest);
     task.kill();
-    assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+    assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
     try {
       task.run();
-      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
     } catch (Exception e) {
-      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
     }
   }
 
@@ -271,4 +300,94 @@ public class TestKillQuery {
       super.dispatch(event);
     }
   }
+
+  abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+
+    @Override
+    public QueryMaster getQueryMaster() {
+      return null;
+    }
+
+    public abstract TajoConf getConf();
+
+    @Override
+    public ServiceTracker getServiceTracker() {
+      return null;
+    }
+
+    @Override
+    public QueryMasterManagerService getQueryMasterManagerService() {
+      return null;
+    }
+
+    @Override
+    public TaskRunnerManager getTaskRunnerManager() {
+      return null;
+    }
+
+    @Override
+    public CatalogService getCatalog() {
+      return null;
+    }
+
+    @Override
+    public WorkerConnectionInfo getConnectionInfo() {
+      return null;
+    }
+
+    @Override
+    public String getWorkerName() {
+      return null;
+    }
+
+    @Override
+    public LocalDirAllocator getLocalDirAllocator() {
+      return null;
+    }
+
+    @Override
+    public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() {
+      return null;
+    }
+
+    @Override
+    public TajoSystemMetrics getWorkerSystemMetrics() {
+      return null;
+    }
+
+    @Override
+    public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+      return null;
+    }
+
+    @Override
+    public HistoryWriter getTaskHistoryWriter() {
+      return null;
+    }
+
+    @Override
+    public HistoryReader getHistoryReader() {
+      return null;
+    }
+
+    @Override
+    public void cleanup(String strPath) {
+
+    }
+
+    @Override
+    public void cleanupTemporalDirectories() {
+
+    }
+
+    @Override
+    public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) {
+
+    }
+
+    @Override
+    public void setNumClusterNodes(int numClusterNodes) {
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
new file mode 100644
index 0000000..9d4e1f3
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+import java.io.IOException;
+
+public class MockExecutionBlock extends ExecutionBlockContext {
+
+  public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
+                            TajoWorkerProtocol.RunExecutionBlockRequestProto request) throws IOException {
+    super(workerContext, null, request);
+  }
+
+  @Override
+  public void init() throws Throwable {
+    //skip
+  }
+
+  @Override
+  public void fatalError(TaskAttemptId taskAttemptId, String message) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
new file mode 100644
index 0000000..18b9405
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+public class MockNodeResourceManager extends NodeResourceManager {
+
+  volatile boolean enableTaskHandlerEvent = true;
+  private final Semaphore barrier;
+
+  public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, EventHandler taskEventHandler) {
+    super(dispatcher, taskEventHandler);
+    this.barrier = barrier;
+  }
+
+  @Override
+  public void handle(NodeResourceEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+
+  @Override
+  protected void startExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+    if(enableTaskHandlerEvent) {
+      super.startExecutionBlock(request);
+    }
+  }
+
+  @Override
+  protected void startTask(TajoWorkerProtocol.TaskRequestProto request, NodeResource resource) {
+    if(enableTaskHandlerEvent) {
+      super.startTask(request, resource);
+    }
+  }
+
+  /**
+   * skip task execution and deallocation for testing
+   * */
+  public void setTaskHandlerEvent(boolean flag) {
+    enableTaskHandlerEvent = flag;
+  }
+
+  protected static Queue<TajoWorkerProtocol.TaskAllocationRequestProto> createTaskRequests(
+      ExecutionBlockId ebId, int memory, int size) {
+
+    Queue<TajoWorkerProtocol.TaskAllocationRequestProto>
+        requestProtoList = new LinkedBlockingQueue<TajoWorkerProtocol.TaskAllocationRequestProto>();
+    for (int i = 0; i < size; i++) {
+
+      TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0);
+      TajoWorkerProtocol.TaskRequestProto.Builder builder =
+          TajoWorkerProtocol.TaskRequestProto.newBuilder();
+      builder.setId(taskAttemptId.getProto());
+      builder.setShouldDie(true);
+      builder.setOutputTable("");
+      builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+      builder.setClusteredOutput(false);
+
+
+      requestProtoList.add(TajoWorkerProtocol.TaskAllocationRequestProto.newBuilder()
+          .setResource(NodeResources.createResource(memory).getProto())
+          .setTaskRequest(builder.build()).build());
+    }
+    return requestProtoList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
index 2d7d0be..dfcfd4f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -39,9 +39,9 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater {
   private Map<Integer, NodeResource> resources = Maps.newHashMap();
   private MockResourceTracker resourceTracker;
 
-  public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo connectionInfo,
+  public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext,
                                NodeResourceManager resourceManager) {
-    super(connectionInfo, resourceManager);
+    super(workerContext, resourceManager);
     this.barrier = barrier;
     this.resourceTracker = new MockResourceTracker();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
new file mode 100644
index 0000000..f62733f
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.event.TaskExecutorEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskExecutor extends TaskExecutor {
+
+  protected final Semaphore barrier;
+
+  public MockTaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) {
+    super(taskManager, rmEventHandler);
+    this.barrier = barrier;
+  }
+
+  @Override
+  public void handle(TaskExecutorEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+
+  @Override
+  protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) {
+    final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+
+    //ignore status changed log
+    final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null) {
+      private TajoProtos.TaskAttemptState state;
+
+      @Override
+      public TajoProtos.TaskAttemptState getState() {
+        return state;
+      }
+
+      @Override
+      public void setState(TajoProtos.TaskAttemptState state) {
+        this.state = state;
+      }
+    };
+
+    return new Task() {
+      @Override
+      public void init() throws IOException {
+
+      }
+
+      @Override
+      public void fetch() {
+
+      }
+
+      @Override
+      public void run() throws Exception {
+        taskAttemptContext.stop();
+        taskAttemptContext.setProgress(1.0f);
+        taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+      }
+
+      @Override
+      public void kill() {
+
+      }
+
+      @Override
+      public void abort() {
+
+      }
+
+      @Override
+      public void cleanup() {
+
+      }
+
+      @Override
+      public boolean hasFetchPhase() {
+        return false;
+      }
+
+      @Override
+      public boolean isProgressChanged() {
+        return false;
+      }
+
+      @Override
+      public boolean isStopped() {
+        return taskAttemptContext.isStopped();
+      }
+
+      @Override
+      public void updateProgress() {
+
+      }
+
+      @Override
+      public TaskAttemptContext getTaskContext() {
+        return taskAttemptContext;
+      }
+
+      @Override
+      public ExecutionBlockContext getExecutionBlockContext() {
+        return context;
+      }
+
+      @Override
+      public TajoWorkerProtocol.TaskStatusProto getReport() {
+        TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder();
+      builder.setWorkerName("localhost:0");
+      builder.setId(taskAttemptContext.getTaskId().getProto())
+          .setProgress(taskAttemptContext.getProgress())
+          .setState(taskAttemptContext.getState());
+
+      builder.setInputStats(new TableStats().getProto());
+      return builder.build();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
new file mode 100644
index 0000000..678b063
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.worker.event.TaskManagerEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskManager extends TaskManager {
+
+  private final Semaphore barrier;
+
+  public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) {
+    super(dispatcher, workerContext, rmEventHandler);
+    this.barrier = barrier;
+  }
+
+  @Override
+  protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+    try {
+      return new MockExecutionBlock(getWorkerContext(), request);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void stopExecutionBlock(ExecutionBlockContext context,
+                                    TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+    //skip for testing
+  }
+
+  @Override
+  public void handle(TaskManagerEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
new file mode 100644
index 0000000..e8c2b9c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+
+public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+  TajoSystemMetrics tajoSystemMetrics;
+
+  @Override
+  public QueryMaster getQueryMaster() {
+    return null;
+  }
+
+  public abstract TajoConf getConf();
+
+  @Override
+  public ServiceTracker getServiceTracker() {
+    return null;
+  }
+
+  @Override
+  public QueryMasterManagerService getQueryMasterManagerService() {
+    return null;
+  }
+
+  @Override
+  public TaskRunnerManager getTaskRunnerManager() {
+    return null;
+  }
+
+  @Override
+  public CatalogService getCatalog() {
+    return null;
+  }
+
+  @Override
+  public WorkerConnectionInfo getConnectionInfo() {
+    return null;
+  }
+
+  @Override
+  public String getWorkerName() {
+    return null;
+  }
+
+  @Override
+  public LocalDirAllocator getLocalDirAllocator() {
+    return null;
+  }
+
+  @Override
+  public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() {
+    return null;
+  }
+
+  @Override
+  public TajoSystemMetrics getWorkerSystemMetrics() {
+
+    if (tajoSystemMetrics == null) {
+      tajoSystemMetrics = new TajoSystemMetrics(getConf(), "test-file-group", "localhost");
+      tajoSystemMetrics.start();
+    }
+    return tajoSystemMetrics;
+  }
+
+  @Override
+  public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+    return null;
+  }
+
+  @Override
+  public HistoryWriter getTaskHistoryWriter() {
+    return null;
+  }
+
+  @Override
+  public HistoryReader getHistoryReader() {
+    return null;
+  }
+
+  @Override
+  public void cleanup(String strPath) {
+
+  }
+
+  @Override
+  public void cleanupTemporalDirectories() {
+
+  }
+
+  @Override
+  public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) {
+
+  }
+
+  @Override
+  public void setNumClusterNodes(int numClusterNodes) {
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 513eb69..65627c1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -104,13 +104,13 @@ public class TestFetcher {
 
   @Test
   public void testAdjustFetchProcess() {
-    assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0);
-    assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0);
-    assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0);
-    assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0);
-    assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0);
-    assertEquals(0.45f, Task.adjustFetchProcess(10, 1), 0);
-    assertEquals(0.5f, Task.adjustFetchProcess(10, 0), 0);
+    assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(10, 10), 0);
+    assertEquals(0.05f, LegacyTaskImpl.adjustFetchProcess(10, 9), 0);
+    assertEquals(0.1f, LegacyTaskImpl.adjustFetchProcess(10, 8), 0);
+    assertEquals(0.25f, LegacyTaskImpl.adjustFetchProcess(10, 5), 0);
+    assertEquals(0.45f, LegacyTaskImpl.adjustFetchProcess(10, 1), 0);
+    assertEquals(0.5f, LegacyTaskImpl.adjustFetchProcess(10, 0), 0);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
index 7407acc..2cee7d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -19,13 +19,15 @@
 package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
@@ -42,9 +44,15 @@ import static org.junit.Assert.*;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 public class TestNodeResourceManager {
 
-  private NodeResourceManager resourceManager;
-  private MockNodeStatusUpdater statusUpdater;
+  private MockNodeResourceManager resourceManager;
+  private NodeStatusUpdater statusUpdater;
+  private TaskManager taskManager;
+  private TaskExecutor taskExecutor;
   private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private TajoWorker.WorkerContext workerContext;
+
+  private CompositeService service;
   private int taskMemory;
   private TajoConf conf;
 
@@ -61,29 +69,55 @@ public class TestNodeResourceManager {
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
 
     dispatcher = new AsyncDispatcher();
-    dispatcher.init(conf);
-    dispatcher.start();
-
-    resourceManager = new NodeResourceManager(dispatcher);
-    resourceManager.init(conf);
-    resourceManager.start();
-
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, resourceManager);
-    statusUpdater.init(conf);
-    statusUpdater.start();
+    taskDispatcher = new AsyncDispatcher();
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler());
+    taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler());
+    resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, taskDispatcher.getEventHandler());
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(taskExecutor);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+    };
+
+    service.init(conf);
+    service.start();
   }
 
   @After
   public void tearDown() {
-    resourceManager.stop();
-    statusUpdater.stop();
-    dispatcher.stop();
+    service.stop();
   }
 
   @Test
   public void testNodeResourceAllocateEvent() throws Exception {
     int requestSize = 4;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
 
     CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
     BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
@@ -91,14 +125,14 @@ public class TestNodeResourceManager {
     requestProto.setExecutionBlockId(ebId.getProto());
 
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
-    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+    requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
 
     dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
 
     BatchAllocationResponseProto responseProto = callFuture.get();
     assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    // allocated all
     assertEquals(0, responseProto.getCancellationTaskCount());
-    assertEquals(requestSize, resourceManager.getAllocatedSize());
   }
 
 
@@ -106,6 +140,7 @@ public class TestNodeResourceManager {
   public void testNodeResourceCancellation() throws Exception {
     int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
     int overSize = 10;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
 
     CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
     BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
@@ -113,18 +148,19 @@ public class TestNodeResourceManager {
     requestProto.setExecutionBlockId(ebId.getProto());
 
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
-    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize));
+    requestProto.addAllTaskRequest(
+        MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize + overSize));
 
     dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
     BatchAllocationResponseProto responseProto = callFuture.get();
 
     assertEquals(overSize, responseProto.getCancellationTaskCount());
-    assertEquals(requestSize, resourceManager.getAllocatedSize());
   }
 
   @Test
   public void testNodeResourceDeallocateEvent() throws Exception {
     int requestSize = 4;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
 
     CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
     BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
@@ -132,21 +168,20 @@ public class TestNodeResourceManager {
     requestProto.setExecutionBlockId(ebId.getProto());
 
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
-    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+    requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
 
     dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
 
     BatchAllocationResponseProto responseProto = callFuture.get();
     assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
     assertEquals(0, responseProto.getCancellationTaskCount());
-    assertEquals(requestSize, resourceManager.getAllocatedSize());
 
     //deallocate
     for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) {
       // direct invoke handler for testing
       resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource()));
     }
-    assertEquals(0, resourceManager.getAllocatedSize());
+
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
   }
 
@@ -154,12 +189,38 @@ public class TestNodeResourceManager {
   public void testParallelRequest() throws Exception {
     final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
     final int taskSize = 100000;
+    resourceManager.setTaskHandlerEvent(true);
+
     final AtomicInteger totalComplete = new AtomicInteger();
     final AtomicInteger totalCanceled = new AtomicInteger();
 
     final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
-    final Queue<TaskAllocationRequestProto> totalTasks = createTaskRequests(taskMemory, taskSize);
+    final Queue<TaskAllocationRequestProto>
+        totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize);
+
+    // first request with starting ExecutionBlock
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost() + ":" +
+            workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    TaskAllocationRequestProto task = totalTasks.poll();
+    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    requestProto.addTaskRequest(task);
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+    CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+    assertTrue(callFuture.get().getCancellationTaskCount() == 0);
+    totalComplete.incrementAndGet();
 
+    // start parallel request
     ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
     List<Future> futureList = Lists.newArrayList();
 
@@ -187,7 +248,6 @@ public class TestNodeResourceManager {
                     totalCanceled.addAndGet(proto.getCancellationTaskCount());
                   } else {
                     complete++;
-                    dispatcher.getEventHandler().handle(new NodeResourceDeallocateEvent(task.getResource()));
                   }
                 } catch (Exception e) {
                   fail(e.getMessage());
@@ -209,27 +269,4 @@ public class TestNodeResourceManager {
     executor.shutdown();
     assertEquals(taskSize, totalComplete.get());
   }
-
-  protected static Queue<TaskAllocationRequestProto> createTaskRequests(int memory, int size) {
-    Queue<TaskAllocationRequestProto> requestProtoList = new LinkedBlockingQueue<TaskAllocationRequestProto>();
-    for (int i = 0; i < size; i++) {
-
-      ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-      TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0);
-
-      TajoWorkerProtocol.TaskRequestProto.Builder builder =
-          TajoWorkerProtocol.TaskRequestProto.newBuilder();
-      builder.setId(taskAttemptId.getProto());
-      builder.setShouldDie(true);
-      builder.setOutputTable("");
-      builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
-      builder.setClusteredOutput(false);
-
-
-      requestProtoList.add(TaskAllocationRequestProto.newBuilder()
-          .setResource(NodeResources.createResource(memory).getProto())
-          .setTaskRequest(builder.build()).build());
-    }
-    return requestProtoList;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
index fb3c77e..af40554 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.event.NodeStatusEvent;
 import org.junit.After;
@@ -37,18 +38,36 @@ public class TestNodeStatusUpdater {
   private MockNodeStatusUpdater statusUpdater;
   private AsyncDispatcher dispatcher;
   private TajoConf conf;
+  private TajoWorker.WorkerContext workerContext;
+
 
   @Before
   public void setup() {
     conf = new TajoConf();
     conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
 
     conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000);
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
 
-    resourceManager = new NodeResourceManager(dispatcher);
+    resourceManager = new NodeResourceManager(dispatcher, null);
     resourceManager.init(conf);
     resourceManager.start();
   }
@@ -63,27 +82,25 @@ public class TestNodeStatusUpdater {
   @Test(timeout = 20000)
   public void testNodeMembership() throws Exception {
     CountDownLatch barrier = new CountDownLatch(1);
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
     statusUpdater.init(conf);
     statusUpdater.start();
 
     MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
     barrier.await();
 
-    assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId()));
+    assertTrue(resourceTracker.getTotalResource().containsKey(workerContext.getConnectionInfo().getId()));
     assertEquals(resourceManager.getTotalResource(),
-        resourceTracker.getTotalResource().get(worker.getId()));
+        resourceTracker.getTotalResource().get(workerContext.getConnectionInfo().getId()));
 
     assertEquals(resourceManager.getAvailableResource(),
-        resourceTracker.getAvailableResource().get(worker.getId()));
+        resourceTracker.getAvailableResource().get(workerContext.getConnectionInfo().getId()));
   }
 
   @Test(timeout = 20000)
   public void testPing() throws Exception {
     CountDownLatch barrier = new CountDownLatch(2);
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
     statusUpdater.init(conf);
     statusUpdater.start();
 
@@ -100,16 +117,29 @@ public class TestNodeStatusUpdater {
   @Test(timeout = 20000)
   public void testResourceReport() throws Exception {
     CountDownLatch barrier = new CountDownLatch(2);
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
     statusUpdater.init(conf);
     statusUpdater.start();
 
+    assertEquals(0, statusUpdater.getQueueSize());
     for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) {
-      dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE,
-          resourceManager.getAvailableResource()));
+      dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
     }
     barrier.await();
     assertEquals(0, statusUpdater.getQueueSize());
   }
+
+  @Test(timeout = 20000)
+  public void testFlushResourceReport() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(2);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    assertEquals(0, statusUpdater.getQueueSize());
+    dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+
+    barrier.await();
+    assertEquals(0, statusUpdater.getQueueSize());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
new file mode 100644
index 0000000..98b187b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.*;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
+import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.junit.Assert.*;
+
+public class TestTaskExecutor {
+
+  private NodeResourceManager resourceManager;
+  private NodeStatusUpdater statusUpdater;
+  private TaskManager taskManager;
+  private TaskExecutor taskExecutor;
+  private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private TajoWorker.WorkerContext workerContext;
+
+  private CompositeService service;
+  private TajoConf conf;
+  private Semaphore barrier;
+  private Semaphore resourceManagerBarrier;
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+    dispatcher = new AsyncDispatcher();
+    taskDispatcher = new AsyncDispatcher();
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+
+    barrier = new Semaphore(0);
+    resourceManagerBarrier = new Semaphore(0);
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler());
+    taskExecutor = new TaskExecutor(barrier, taskManager, dispatcher.getEventHandler());
+    resourceManager = new MockNodeResourceManager(resourceManagerBarrier, dispatcher, taskDispatcher.getEventHandler());
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(taskExecutor);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getWorkerSystemMetrics().stop();
+        super.serviceStop();
+      }
+    };
+
+    service.init(conf);
+    service.start();
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test
+  public void testTaskRequest() throws Exception {
+    int requestSize = 1;
+
+    RunExecutionBlockRequestProto.Builder
+        ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    //verify running task
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(1, taskExecutor.getRunningTasks());
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(0, taskExecutor.getRunningTasks());
+    assertEquals(1, taskExecutor.completeTasks);
+
+    //verify the released resources
+    Thread.sleep(100);
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+  }
+
+  @Test
+  public void testTaskException() throws Exception {
+    int requestSize = 1;
+
+    RunExecutionBlockRequestProto.Builder
+        ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost()+":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
+
+    taskExecutor.throwException.set(true);
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    //verify running task
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(1, taskExecutor.getRunningTasks());
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(0, taskExecutor.getRunningTasks());
+    assertEquals(0, taskExecutor.completeTasks);
+
+    //verify the released resources
+    Thread.sleep(100);
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+  }
+
+  class TaskExecutor extends MockTaskExecutor {
+    int completeTasks;
+    AtomicBoolean throwException = new AtomicBoolean();
+
+    public TaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) {
+      super(barrier, taskManager, rmEventHandler);
+    }
+
+    @Override
+    protected void stopTask(TaskAttemptId taskId) {
+      super.stopTask(taskId);
+      super.barrier.release();
+    }
+
+    @Override
+    protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) {
+      final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+      final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null);
+
+      return new Task() {
+        @Override
+        public void init() throws IOException {
+
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+
+        @Override
+        public void fetch() {
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+
+        @Override
+        public void run() throws Exception {
+          Thread.sleep(50);
+
+          if(throwException.get()) throw new RuntimeException();
+
+          taskAttemptContext.stop();
+          taskAttemptContext.setProgress(1.0f);
+          taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+          completeTasks++;
+        }
+
+        @Override
+        public void kill() {
+
+        }
+
+        @Override
+        public void abort() {
+
+        }
+
+        @Override
+        public void cleanup() {
+        }
+
+        @Override
+        public boolean hasFetchPhase() {
+          return false;
+        }
+
+        @Override
+        public boolean isProgressChanged() {
+          return false;
+        }
+
+        @Override
+        public boolean isStopped() {
+          return taskAttemptContext.isStopped();
+        }
+
+        @Override
+        public void updateProgress() {
+
+        }
+
+        @Override
+        public TaskAttemptContext getTaskContext() {
+          return taskAttemptContext;
+        }
+
+        @Override
+        public ExecutionBlockContext getExecutionBlockContext() {
+          return context;
+        }
+
+        @Override
+        public TajoWorkerProtocol.TaskStatusProto getReport() {
+          TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder();
+          builder.setWorkerName("localhost:0");
+          builder.setId(taskAttemptContext.getTaskId().getProto())
+              .setProgress(taskAttemptContext.getProgress())
+              .setState(taskAttemptContext.getState());
+
+          builder.setInputStats(new TableStats().getProto());
+          return builder.build();
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
new file mode 100644
index 0000000..8bca489
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
+import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.junit.Assert.*;
+
+public class TestTaskManager {
+
+  private NodeResourceManager resourceManager;
+  private NodeStatusUpdater statusUpdater;
+  private TaskManager taskManager;
+  private TaskExecutor taskExecutor;
+  private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private TajoWorker.WorkerContext workerContext;
+
+  private CompositeService service;
+  private int taskMemory;
+  private TajoConf conf;
+  private Semaphore barrier;
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+    taskMemory = 512;
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
+        taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+
+    dispatcher = new AsyncDispatcher();
+    taskDispatcher = new AsyncDispatcher();
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+    barrier = new Semaphore(0);
+    taskManager = new MockTaskManager(barrier, taskDispatcher, workerContext, dispatcher.getEventHandler());
+    taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler());
+    resourceManager = new NodeResourceManager(dispatcher, taskDispatcher.getEventHandler());
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(taskExecutor);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getWorkerSystemMetrics().stop();
+        super.serviceStop();
+      }
+    };
+
+    service.init(conf);
+    service.start();
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test(timeout = 10000)
+  public void testExecutionBlockStart() throws Exception {
+    int requestSize = 1;
+
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertNotNull(taskManager.getExecutionBlockContext(ebId));
+    assertEquals(ebId, taskManager.getExecutionBlockContext(ebId).getExecutionBlockId());
+  }
+
+  @Test(timeout = 10000)
+  public void testExecutionBlockStop() throws Exception {
+
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost()+":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    taskDispatcher.getEventHandler().handle(new ExecutionBlockStartEvent(ebRequestProto.build()));
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertNotNull(taskManager.getExecutionBlockContext(ebId));
+    assertEquals(ebId, taskManager.getExecutionBlockContext(ebId).getExecutionBlockId());
+
+    ExecutionBlockListProto.Builder ebList = ExecutionBlockListProto.newBuilder();
+    taskDispatcher.getEventHandler().handle(new ExecutionBlockStopEvent(ebId.getProto(), ebList.build()));
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertNull(taskManager.getExecutionBlockContext(ebId));
+  }
+}


[3/3] tajo git commit: TAJO-1615: Implement TaskManager. (jinho)

Posted by jh...@apache.org.
TAJO-1615: Implement TaskManager. (jinho)

Closes #595


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/36da0dac
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/36da0dac
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/36da0dac

Branch: refs/heads/master
Commit: 36da0dac7d90fa12b3d9cf11fa5b561a586e60ff
Parents: dfcf41d
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Jun 8 16:52:05 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Jun 8 16:52:05 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../physical/HashShuffleFileWriteExec.java      |   2 +-
 .../engine/planner/physical/PhysicalExec.java   |   2 +-
 .../tajo/master/rm/TajoResourceTracker.java     |   4 +-
 .../tajo/util/TajoUncaughtExceptionHandler.java |  70 ++
 .../apache/tajo/util/history/HistoryWriter.java |   2 +-
 .../tajo/worker/ExecutionBlockContext.java      |  83 +-
 .../org/apache/tajo/worker/LegacyTaskImpl.java  | 844 +++++++++++++++++++
 .../apache/tajo/worker/NodeResourceManager.java |  45 +-
 .../apache/tajo/worker/NodeStatusUpdater.java   |  34 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  47 +-
 .../tajo/worker/TajoWorkerManagerService.java   |   9 +-
 .../main/java/org/apache/tajo/worker/Task.java  | 834 +-----------------
 .../apache/tajo/worker/TaskAttemptContext.java  |  61 +-
 .../org/apache/tajo/worker/TaskContainer.java   |  85 ++
 .../org/apache/tajo/worker/TaskExecutor.java    | 194 +++++
 .../java/org/apache/tajo/worker/TaskImpl.java   | 838 ++++++++++++++++++
 .../org/apache/tajo/worker/TaskManager.java     | 180 ++++
 .../java/org/apache/tajo/worker/TaskRunner.java |  10 +-
 .../apache/tajo/worker/TaskRunnerHistory.java   |   1 +
 .../apache/tajo/worker/TaskRunnerManager.java   |  12 +-
 .../worker/event/ExecutionBlockStartEvent.java  |  35 +
 .../worker/event/ExecutionBlockStopEvent.java   |  37 +
 .../worker/event/NodeResourceAllocateEvent.java |   2 +-
 .../event/NodeResourceDeallocateEvent.java      |   2 +-
 .../tajo/worker/event/NodeResourceEvent.java    |  35 +
 .../worker/event/NodeResourceManagerEvent.java  |  34 -
 .../tajo/worker/event/NodeStatusEvent.java      |  11 +-
 .../tajo/worker/event/TaskExecutorEvent.java    |  44 +
 .../tajo/worker/event/TaskManagerEvent.java     |  43 +
 .../tajo/worker/event/TaskRunnerEvent.java      |   1 +
 .../tajo/worker/event/TaskRunnerStartEvent.java |  44 +-
 .../tajo/worker/event/TaskRunnerStopEvent.java  |   1 +
 .../tajo/worker/event/TaskStartEvent.java       |  44 +
 .../src/main/proto/TajoWorkerProtocol.proto     |   1 +
 .../apache/tajo/querymaster/TestKillQuery.java  | 135 ++-
 .../apache/tajo/worker/MockExecutionBlock.java  |  42 +
 .../tajo/worker/MockNodeResourceManager.java    |  96 +++
 .../tajo/worker/MockNodeStatusUpdater.java      |   4 +-
 .../apache/tajo/worker/MockTaskExecutor.java    | 141 ++++
 .../org/apache/tajo/worker/MockTaskManager.java |  59 ++
 .../apache/tajo/worker/MockWorkerContext.java   | 129 +++
 .../org/apache/tajo/worker/TestFetcher.java     |  14 +-
 .../tajo/worker/TestNodeResourceManager.java    | 135 +--
 .../tajo/worker/TestNodeStatusUpdater.java      |  54 +-
 .../apache/tajo/worker/TestTaskExecutor.java    | 330 ++++++++
 .../org/apache/tajo/worker/TestTaskManager.java | 185 ++++
 47 files changed, 3904 insertions(+), 1113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 44a8939..066e086 100644
--- a/CHANGES
+++ b/CHANGES
@@ -333,6 +333,8 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1615: Implement TaskManager. (jinho)
+
     TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)
 
     TAJO-1613: Rename StorageManager to Tablespace. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index f1e2fe5..1a92a7a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -86,7 +86,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
     HashShuffleAppender appender = appenderMap.get(partId);
     if (appender == null) {
       appender = hashShuffleAppenderManager.getAppender(context.getConf(),
-          context.getQueryId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
+          context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
       appenderMap.put(partId, appender);
     }
     return appender;

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index de14c9a..87a19a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -79,7 +79,7 @@ public abstract class PhysicalExec implements SchemaObject {
   }
 
   protected Path getExecutorTmpDir() {
-    return new Path(context.getQueryId().getTaskId().getExecutionBlockId().getQueryId().toString(),
+    return new Path(context.getTaskId().getTaskId().getExecutionBlockId().getQueryId().toString(),
         UUID.randomUUID().toString());
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index af28886..2a18de7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -188,7 +188,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request,
                             RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) {
     //TODO implement with ResourceManager for scheduler
-    throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage()));
+    TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.Builder
+        response = TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.newBuilder();
+    done.run(response.setCommand(TajoResourceTrackerProtocol.ResponseCommand.NORMAL).build());
   }
 
   private Worker createWorkerResource(NodeHeartbeat request) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java b/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java
new file mode 100644
index 0000000..c424154
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tajo.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+/**
+ * This class is intended to be installed by calling 
+ * {@link Thread#setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)}
+ * In the main entry point.  It is intended to try and cleanly shut down
+ * programs using the Yarn Event framework.
+ * 
+ * Note: Right now it only will shut down the program if a Error is caught, but
+ * not any other exception.  Anything else is just logged.
+ *
+ * this is an implementation copied from YarnUncaughtExceptionHandler
+ */
+public class TajoUncaughtExceptionHandler implements UncaughtExceptionHandler {
+  private static final Log LOG = LogFactory.getLog(TajoUncaughtExceptionHandler.class);
+  
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+    if(ShutdownHookManager.get().isShutdownInProgress()) {
+      LOG.error("Thread " + t + " threw an Throwable, but we are shutting " +
+      		"down, so ignoring this", e);
+    } else if(e instanceof Error) {
+      try {
+        LOG.fatal("Thread " + t + " threw an Error.", e);
+      } catch (Throwable err) {
+        //We don't want to not exit because of an issue with logging
+      }
+
+      if(e instanceof OutOfMemoryError) {
+        //After catching an OOM java says it is undefined behavior, so don't
+        //even try to clean up or we can get stuck on shutdown.
+        try {
+          System.err.println("Halting due to Out Of Memory Error...");
+        } catch (Throwable err) {
+          //Again we done want to exit because of logging issues.
+        }
+        ExitUtil.halt(-1);
+      } else {
+        //ExitUtil.terminate(-1);
+      }
+    } else {
+      LOG.error("Thread " + t + " threw an Exception.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index e8ba304..daced3e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -217,7 +217,7 @@ public class HistoryWriter extends AbstractService {
     public void run() {
       LOG.info("HistoryWriter_" + processName + " started.");
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
-      while (!stopped.get()) {
+      while (!stopped.get() && !Thread.interrupted()) {
         List<WriterFuture<WriterHolder>> histories = Lists.newArrayList();
 
         try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 0cc3304..9e4a60f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -45,7 +46,6 @@ import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -61,10 +61,10 @@ public class ExecutionBlockContext {
   private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
 
   private TaskRunnerManager manager;
-  public AtomicInteger completedTasksNum = new AtomicInteger();
-  public AtomicInteger succeededTasksNum = new AtomicInteger();
-  public AtomicInteger killedTasksNum = new AtomicInteger();
-  public AtomicInteger failedTasksNum = new AtomicInteger();
+  protected AtomicInteger completedTasksNum = new AtomicInteger();
+  protected AtomicInteger succeededTasksNum = new AtomicInteger();
+  protected AtomicInteger killedTasksNum = new AtomicInteger();
+  protected AtomicInteger failedTasksNum = new AtomicInteger();
 
   private FileSystem localFS;
   // for input files
@@ -95,17 +95,18 @@ public class ExecutionBlockContext {
   // It keeps all of the query unit attempts while a TaskRunner is running.
   private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap();
 
+  @Deprecated
   private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
 
-  public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext,
-                               TaskRunnerManager manager, QueryContext queryContext, String plan,
-                               ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster,
-                               PlanProto.ShuffleType shuffleType) throws Throwable {
+  private final Map<TaskId, TaskHistory> taskHistories = Maps.newTreeMap();
+
+  public ExecutionBlockContext(TajoWorker.WorkerContext workerContext,
+                               TaskRunnerManager manager, RunExecutionBlockRequestProto request) throws IOException {
     this.manager = manager;
-    this.executionBlockId = executionBlockId;
+    this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
     this.connManager = RpcClientManager.getInstance();
-    this.queryMaster = queryMaster;
-    this.systemConf = conf;
+    this.queryMaster = new WorkerConnectionInfo(request.getQueryMaster());
+    this.systemConf = workerContext.getConf();
     this.reporter = new Reporter();
     this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
     this.localFS = FileSystem.getLocal(systemConf);
@@ -113,11 +114,11 @@ public class ExecutionBlockContext {
     // Setup QueryEngine according to the query plan
     // Here, we can setup row-based query engine or columnar query engine.
     this.queryEngine = new TajoQueryEngine(systemConf);
-    this.queryContext = queryContext;
-    this.plan = plan;
+    this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext());
+    this.plan = request.getPlanJson();
     this.resource = new ExecutionBlockSharedResource();
     this.workerContext = workerContext;
-    this.shuffleType = shuffleType;
+    this.shuffleType = request.getShuffleType();
   }
 
   public void init() throws Throwable {
@@ -131,7 +132,8 @@ public class ExecutionBlockContext {
     UserGroupInformation.setConfiguration(systemConf);
     // TODO - 'load credential' should be implemented
     // Getting taskOwner
-    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
+    UserGroupInformation
+        taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
 
     // initialize DFS and LocalFileSystems
     this.taskOwner = taskOwner;
@@ -144,7 +146,7 @@ public class ExecutionBlockContext {
       try {
         getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
       } catch (Throwable t) {
-        //ignore
+        LOG.error(t);
       }
       throw e;
     }
@@ -183,9 +185,9 @@ public class ExecutionBlockContext {
 
     // If ExecutionBlock is stopped, all running or pending tasks will be marked as failed.
     for (Task task : tasks.values()) {
-      if (task.getStatus() == TajoProtos.TaskAttemptState.TA_PENDING ||
-          task.getStatus() == TajoProtos.TaskAttemptState.TA_RUNNING) {
-        task.setState(TajoProtos.TaskAttemptState.TA_FAILED);
+      if (task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_PENDING ||
+          task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_RUNNING) {
+
         try{
           task.abort();
         } catch (Throwable e){
@@ -194,7 +196,7 @@ public class ExecutionBlockContext {
       }
     }
     tasks.clear();
-
+    taskHistories.clear();
     resource.release();
     RpcClientManager.cleanup(client);
   }
@@ -253,18 +255,40 @@ public class ExecutionBlockContext {
     return tasks.get(taskAttemptId);
   }
 
+  @Deprecated
   public void stopTaskRunner(String id){
     manager.stopTaskRunner(id);
   }
 
+  @Deprecated
   public TaskRunner getTaskRunner(String taskRunnerId){
     return manager.getTaskRunner(taskRunnerId);
   }
 
+  @Deprecated
   public void addTaskHistory(String taskRunnerId, TaskAttemptId quAttemptId, TaskHistory taskHistory) {
     histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory);
   }
 
+  public void addTaskHistory(TaskId taskId, TaskHistory taskHistory) {
+    taskHistories.put(taskId, taskHistory);
+  }
+
+  public Map<TaskId, TaskHistory> getTaskHistories() {
+    return taskHistories;
+  }
+
+  public void fatalError(TaskAttemptId taskAttemptId, String message) {
+    if (message == null) {
+      message = "No error message";
+    }
+    TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
+        .setId(taskAttemptId.getProto())
+        .setErrorMessage(message);
+
+    getStub().fatalError(null, builder.build(), NullCallback.get());
+  }
+
   public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){
     histories.putIfAbsent(runner.getId(), new TaskRunnerHistory(runner.getContainerId(), executionBlockId));
     return histories.get(runner.getId());
@@ -355,7 +379,6 @@ public class ExecutionBlockContext {
 
   protected class Reporter {
     private Thread reporterThread;
-    private AtomicBoolean reporterStop = new AtomicBoolean();
     private static final int PROGRESS_INTERVAL = 1000;
     private static final int MAX_RETRIES = 10;
 
@@ -374,7 +397,7 @@ public class ExecutionBlockContext {
         int remainingRetries = MAX_RETRIES;
         @Override
         public void run() {
-          while (!reporterStop.get() && !Thread.interrupted()) {
+          while (!isStopped() && !Thread.interrupted()) {
 
             try {
               Interface masterStub = getStub();
@@ -384,13 +407,11 @@ public class ExecutionBlockContext {
               } else {
                 for (Task task : new ArrayList<Task>(tasks.values())){
 
-                  if (task.isRunning() && task.isProgressChanged()) {
-                    task.updateProgress();
+                  if (task.getTaskContext().getState() ==
+                      TajoProtos.TaskAttemptState.TA_RUNNING && task.isProgressChanged()) {
                     masterStub.statusUpdate(null, task.getReport(), NullCallback.get());
-                    task.getContext().setProgressChanged(false);
-                  } else {
-                    task.updateProgress();
                   }
+                  task.updateProgress();
                 }
               }
             } catch (Throwable t) {
@@ -402,7 +423,7 @@ public class ExecutionBlockContext {
                 throw new RuntimeException(t);
               }
             } finally {
-              if (remainingRetries > 0 && !reporterStop.get()) {
+              if (remainingRetries > 0 && !isStopped()) {
                 synchronized (reporterThread) {
                   try {
                     reporterThread.wait(PROGRESS_INTERVAL);
@@ -417,10 +438,6 @@ public class ExecutionBlockContext {
     }
 
     public void stop() throws InterruptedException {
-      if (reporterStop.getAndSet(true)) {
-        return;
-      }
-
       if (reporterThread != null) {
         // Intent of the lock is to not send an interupt in the middle of an
         // umbilical.ping or umbilical.statusUpdate

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
new file mode 100644
index 0000000..0721ef1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+
+@Deprecated
+public class LegacyTaskImpl implements Task {
+  private static final Log LOG = LogFactory.getLog(LegacyTaskImpl.class);
+  private static final float FETCHER_PROGRESS = 0.5f;
+
+  private final TajoConf systemConf;
+  private final QueryContext queryContext;
+  private final ExecutionBlockContext executionBlockContext;
+  private final String taskRunnerId;
+
+  private final Path taskDir;
+  private final TaskRequest request;
+  private TaskAttemptContext context;
+  private List<Fetcher> fetcherRunners;
+  private LogicalNode plan;
+  private final Map<String, TableDesc> descs = Maps.newHashMap();
+  private PhysicalExec executor;
+  private boolean interQuery;
+  private Path inputTableBaseDir;
+
+  private long startTime;
+  private long finishTime;
+
+  private final TableStats inputStats;
+  private List<FileChunk> localChunks;
+
+  // TODO - to be refactored
+  private ShuffleType shuffleType = null;
+  private Schema finalSchema = null;
+  private TupleComparator sortComp = null;
+
+  public LegacyTaskImpl(String taskRunnerId,
+                        Path baseDir,
+                        TaskAttemptId taskId,
+                        final ExecutionBlockContext executionBlockContext,
+                        final TaskRequest request) throws IOException {
+    this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
+  }
+
+  public LegacyTaskImpl(String taskRunnerId,
+                        Path baseDir,
+                        TaskAttemptId taskId,
+                        TajoConf conf,
+                        final ExecutionBlockContext executionBlockContext,
+                        final TaskRequest request) throws IOException {
+    this.taskRunnerId = taskRunnerId;
+    this.request = request;
+
+    this.systemConf = conf;
+    this.queryContext = request.getQueryContext(systemConf);
+    this.executionBlockContext = executionBlockContext;
+    this.taskDir = StorageUtil.concatPath(baseDir,
+        taskId.getTaskId().getId() + "_" + taskId.getId());
+
+    this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
+        request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
+    this.context.setDataChannel(request.getDataChannel());
+    this.context.setEnforcer(request.getEnforcer());
+    this.context.setState(TaskAttemptState.TA_PENDING);
+    this.inputStats = new TableStats();
+    this.fetcherRunners = Lists.newArrayList();
+  }
+
+  public void initPlan() throws IOException {
+    plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
+    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+    if (scanNode != null) {
+      for (LogicalNode node : scanNode) {
+        ScanNode scan = (ScanNode) node;
+        descs.put(scan.getCanonicalName(), scan.getTableDesc());
+      }
+    }
+
+    LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
+    if (partitionScanNode != null) {
+      for (LogicalNode node : partitionScanNode) {
+        PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
+        descs.put(scan.getCanonicalName(), scan.getTableDesc());
+      }
+    }
+
+    interQuery = request.getProto().getInterQuery();
+    if (interQuery) {
+      context.setInterQuery();
+      this.shuffleType = context.getDataChannel().getShuffleType();
+
+      if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
+        SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+        this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+        this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
+      }
+    } else {
+      Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+          .getAppenderFilePath(getId(), queryContext.getStagingDir());
+      LOG.info("Output File Path: " + outFilePath);
+      context.setOutputPath(outFilePath);
+    }
+
+    this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+    LOG.info("==================================");
+    LOG.info("* Stage " + request.getId() + " is initialized");
+    LOG.info("* InterQuery: " + interQuery
+        + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
+        ", Fragments (num: " + request.getFragments().size() + ")" +
+        ", Fetches (total:" + request.getFetches().size() + ") :");
+
+    if(LOG.isDebugEnabled()) {
+      for (FetchImpl f : request.getFetches()) {
+        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+      }
+    }
+    LOG.info("* Local task dir: " + taskDir);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("* plan:\n");
+      LOG.debug(plan.toString());
+    }
+    LOG.info("==================================");
+  }
+
+  private void startScriptExecutors() throws IOException {
+    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+      executor.start(systemConf);
+    }
+  }
+
+  private void stopScriptExecutors() {
+    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+      executor.shutdown();
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+    initPlan();
+    startScriptExecutors();
+
+    if (context.getState() == TaskAttemptState.TA_PENDING) {
+      // initialize a task temporal dir
+      FileSystem localFS = executionBlockContext.getLocalFS();
+      localFS.mkdirs(taskDir);
+
+      if (request.getFetches().size() > 0) {
+        inputTableBaseDir = localFS.makeQualified(
+            executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
+                getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+        localFS.mkdirs(inputTableBaseDir);
+        Path tableDir;
+        for (String inputTable : context.getInputTables()) {
+          tableDir = new Path(inputTableBaseDir, inputTable);
+          if (!localFS.exists(tableDir)) {
+            LOG.info("the directory is created  " + tableDir.toUri());
+            localFS.mkdirs(tableDir);
+          }
+        }
+      }
+      // for localizing the intermediate data
+      fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
+    }
+  }
+
+  private TaskAttemptId getId() {
+    return context.getTaskId();
+  }
+
+  public String toString() {
+    return "queryId: " + this.getId() + " status: " + context.getState();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return context.isStopped();
+  }
+
+  @Override
+  public TaskAttemptContext getTaskContext() {
+    return context;
+  }
+
+  @Override
+  public ExecutionBlockContext getExecutionBlockContext() {
+    return executionBlockContext;
+  }
+
+  @Override
+  public boolean hasFetchPhase() {
+    return fetcherRunners.size() > 0;
+  }
+
+  @Override
+  public void fetch() {
+    ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
+    for (Fetcher f : fetcherRunners) {
+      executorService.submit(new FetchRunner(context, f));
+    }
+  }
+
+  @Override
+  public void kill() {
+    stopScriptExecutors();
+    context.setState(TaskAttemptState.TA_KILLED);
+    context.stop();
+  }
+
+  @Override
+  public void abort() {
+    stopScriptExecutors();
+    context.setState(TajoProtos.TaskAttemptState.TA_FAILED);
+    context.stop();
+  }
+
+  @Override
+  public TaskStatusProto getReport() {
+    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+    builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
+    builder.setId(context.getTaskId().getProto())
+        .setProgress(context.getProgress())
+        .setState(context.getState());
+
+    builder.setInputStats(reloadInputStats());
+
+    if (context.getResultStats() != null) {
+      builder.setResultStats(context.getResultStats().getProto());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public boolean isProgressChanged() {
+    return context.isProgressChanged();
+  }
+
+  @Override
+  public void updateProgress() {
+    if(context != null && context.isStopped()){
+      return;
+    }
+
+    if (executor != null && context.getProgress() < 1.0f) {
+      context.setExecutorProgress(executor.getProgress());
+    }
+  }
+
+  private CatalogProtos.TableStatsProto reloadInputStats() {
+    synchronized(inputStats) {
+      if (this.executor == null) {
+        return inputStats.getProto();
+      }
+
+      TableStats executorInputStats = this.executor.getInputStats();
+
+      if (executorInputStats != null) {
+        inputStats.setValues(executorInputStats);
+      }
+      return inputStats.getProto();
+    }
+  }
+
+  private TaskCompletionReport getTaskCompletionReport() {
+    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
+    builder.setId(context.getTaskId().getProto());
+
+    builder.setInputStats(reloadInputStats());
+
+    if (context.hasResultStats()) {
+      builder.setResultStats(context.getResultStats().getProto());
+    } else {
+      builder.setResultStats(new TableStats().getProto());
+    }
+
+    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
+    if (it.hasNext()) {
+      do {
+        Entry<Integer, String> entry = it.next();
+        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+        part.setPartId(entry.getKey());
+
+        // Set output volume
+        if (context.getPartitionOutputVolume() != null) {
+          for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
+            if (entry.getKey().equals(e.getKey())) {
+              part.setVolume(e.getValue().longValue());
+              break;
+            }
+          }
+        }
+
+        builder.addShuffleFileOutputs(part.build());
+      } while (it.hasNext());
+    }
+
+    return builder.build();
+  }
+
+  private void waitForFetch() throws InterruptedException, IOException {
+    context.getFetchLatch().await();
+    LOG.info(context.getTaskId() + " All fetches are done!");
+    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
+
+    // Get all broadcasted tables
+    Set<String> broadcastTableNames = new HashSet<String>();
+    List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
+    if (broadcasts != null) {
+      for (EnforceProperty eachBroadcast : broadcasts) {
+        broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
+      }
+    }
+
+    // localize the fetched data and skip the broadcast table
+    for (String inputTable: inputs) {
+      if (broadcastTableNames.contains(inputTable)) {
+        continue;
+      }
+      File tableDir = new File(context.getFetchIn(), inputTable);
+      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+      context.updateAssignedFragments(inputTable, frags);
+    }
+  }
+
+  @Override
+  public void run() throws Exception {
+    startTime = System.currentTimeMillis();
+    Throwable error = null;
+    try {
+      if(!context.isStopped()) {
+        context.setState(TaskAttemptState.TA_RUNNING);
+        if (context.hasFetchPhase()) {
+          // If the fetch is still in progress, the query unit must wait for
+          // complete.
+          waitForFetch();
+          context.setFetcherProgress(FETCHER_PROGRESS);
+          updateProgress();
+        }
+
+        this.executor = executionBlockContext.getTQueryEngine().
+            createPlan(context, plan);
+        this.executor.init();
+
+        while(!context.isStopped() && executor.next() != null) {
+        }
+      }
+    } catch (Throwable e) {
+      error = e ;
+      LOG.error(e.getMessage(), e);
+      stopScriptExecutors();
+      context.stop();
+    } finally {
+      if (executor != null) {
+        try {
+          executor.close();
+          reloadInputStats();
+        } catch (IOException e) {
+          LOG.error(e, e);
+        }
+        this.executor = null;
+      }
+
+      executionBlockContext.completedTasksNum.incrementAndGet();
+      context.getHashShuffleAppenderManager().finalizeTask(getId());
+
+      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
+      if (context.isStopped()) {
+        context.setExecutorProgress(0.0f);
+
+        if (context.getState() == TaskAttemptState.TA_KILLED) {
+          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+          executionBlockContext.killedTasksNum.incrementAndGet();
+        } else {
+          context.setState(TaskAttemptState.TA_FAILED);
+          TaskFatalErrorReport.Builder errorBuilder =
+              TaskFatalErrorReport.newBuilder()
+                  .setId(getId().getProto());
+          if (error != null) {
+            if (error.getMessage() == null) {
+              errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
+            } else {
+              errorBuilder.setErrorMessage(error.getMessage());
+            }
+            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
+          }
+
+          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+          executionBlockContext.failedTasksNum.incrementAndGet();
+        }
+      } else {
+        // if successful
+        context.stop();
+        context.setProgress(1.0f);
+        context.setState(TaskAttemptState.TA_SUCCEEDED);
+        executionBlockContext.succeededTasksNum.incrementAndGet();
+
+        TaskCompletionReport report = getTaskCompletionReport();
+        queryMasterStub.done(null, report, NullCallback.get());
+      }
+      finishTime = System.currentTimeMillis();
+      LOG.info(context.getTaskId() + " completed. " +
+          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    TaskHistory taskHistory = createTaskHistory();
+    executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
+    executionBlockContext.getTasks().remove(getId());
+
+    fetcherRunners.clear();
+    fetcherRunners = null;
+    try {
+      if(executor != null) {
+        executor.close();
+        executor = null;
+      }
+    } catch (IOException e) {
+      LOG.fatal(e.getMessage(), e);
+    }
+
+    executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
+    stopScriptExecutors();
+  }
+
+  public TaskHistory createTaskHistory() {
+    TaskHistory taskHistory = null;
+    try {
+      taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(),
+          startTime, finishTime, reloadInputStats());
+
+      if (context.getOutputPath() != null) {
+        taskHistory.setOutputPath(context.getOutputPath().toString());
+      }
+
+      if (context.getWorkDir() != null) {
+        taskHistory.setWorkingPath(context.getWorkDir().toString());
+      }
+
+      if (context.getResultStats() != null) {
+        taskHistory.setOutputStats(context.getResultStats().getProto());
+      }
+
+      if (hasFetchPhase()) {
+        taskHistory.setTotalFetchCount(fetcherRunners.size());
+        int i = 0;
+        FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
+        for (Fetcher fetcher : fetcherRunners) {
+          // TODO store the fetcher histories
+          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
+            builder.setStartTime(fetcher.getStartTime());
+            builder.setFinishTime(fetcher.getFinishTime());
+            builder.setFileLength(fetcher.getFileLen());
+            builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
+            builder.setState(fetcher.getState());
+
+            taskHistory.addFetcherHistory(builder.build());
+          }
+          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
+        }
+        taskHistory.setFinishedFetchCount(i);
+      }
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+
+    return taskHistory;
+  }
+
+  public int hashCode() {
+    return context.hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof LegacyTaskImpl) {
+      LegacyTaskImpl other = (LegacyTaskImpl) obj;
+      return this.context.equals(other.context);
+    }
+    return false;
+  }
+
+  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+      throws IOException {
+    Configuration c = new Configuration(systemConf);
+    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
+    FileSystem fs = FileSystem.get(c);
+    Path tablePath = new Path(file.getAbsolutePath());
+
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus f : fileLists) {
+      if (f.getLen() == 0) {
+        continue;
+      }
+      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
+      listTablets.add(tablet);
+    }
+
+    // Special treatment for locally pseudo fetched chunks
+    synchronized (localChunks) {
+      for (FileChunk chunk : localChunks) {
+        if (name.equals(chunk.getEbId())) {
+          tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
+          listTablets.add(tablet);
+          LOG.info("One local chunk is added to listTablets");
+        }
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  private class FetchRunner implements Runnable {
+    private final TaskAttemptContext ctx;
+    private final Fetcher fetcher;
+    private int maxRetryNum;
+
+    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+      this.ctx = ctx;
+      this.fetcher = fetcher;
+      this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
+    }
+
+    @Override
+    public void run() {
+      int retryNum = 0;
+      int retryWaitTime = 1000; //sec
+
+      try { // for releasing fetch latch
+        while(!context.isStopped() && retryNum < maxRetryNum) {
+          if (retryNum > 0) {
+            try {
+              Thread.sleep(retryWaitTime);
+              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // max 10 seconds
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+          }
+          try {
+            FileChunk fetched = fetcher.get();
+            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
+          && fetched.getFile() != null) {
+              if (fetched.fromRemote() == false) {
+          localChunks.add(fetched);
+          LOG.info("Add a new FileChunk to local chunk list");
+              }
+              break;
+            }
+          } catch (Throwable e) {
+            LOG.error("Fetch failed: " + fetcher.getURI(), e);
+          }
+          retryNum++;
+        }
+      } finally {
+        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+          fetcherFinished(ctx);
+        } else {
+          if (retryNum == maxRetryNum) {
+            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
+          }
+          stopScriptExecutors();
+          context.stop(); // retry task
+          ctx.getFetchLatch().countDown();
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
+    if (totalFetcher > 0) {
+      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
+    } else {
+      return 0.0f;
+    }
+  }
+
+  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+    int fetcherSize = fetcherRunners.size();
+    if(fetcherSize == 0) {
+      return;
+    }
+
+    ctx.getFetchLatch().countDown();
+
+    int remainFetcher = (int) ctx.getFetchLatch().getCount();
+    if (remainFetcher == 0) {
+      context.setFetcherProgress(FETCHER_PROGRESS);
+    } else {
+      context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
+    }
+  }
+
+  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
+                                        List<FetchImpl> fetches) throws IOException {
+
+    if (fetches.size() > 0) {
+      Path inputDir = executionBlockContext.getLocalDirAllocator().
+          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+
+      int i = 0;
+      File storeDir;
+      File defaultStoreFile;
+      FileChunk storeChunk = null;
+      List<Fetcher> runnerList = Lists.newArrayList();
+
+      for (FetchImpl f : fetches) {
+        storeDir = new File(inputDir.toString(), f.getName());
+        if (!storeDir.exists()) {
+          storeDir.mkdirs();
+        }
+
+        for (URI uri : f.getURIs()) {
+          defaultStoreFile = new File(storeDir, "in_" + i);
+          InetAddress address = InetAddress.getByName(uri.getHost());
+
+          WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
+          if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
+            boolean hasError = false;
+            try {
+              LOG.info("Try to get local file chunk at local host");
+              storeChunk = getLocalStoredFileChunk(uri, systemConf);
+            } catch (Throwable t) {
+              hasError = true;
+            }
+
+            // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+            // So, we should skip and don't need to create storeChunk.
+            if (storeChunk == null && !hasError) {
+              continue;
+            }
+
+            if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+                && hasError == false) {
+              storeChunk.setFromRemote(false);
+            } else {
+              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+              storeChunk.setFromRemote(true);
+            }
+          } else {
+            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+            storeChunk.setFromRemote(true);
+          }
+
+          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+          storeChunk.setEbId(f.getName());
+          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
+          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+          runnerList.add(fetcher);
+          i++;
+        }
+      }
+      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+      return runnerList;
+    } else {
+      return Lists.newArrayList();
+    }
+  }
+
+  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+    // Parse the URI
+    LOG.info("getLocalStoredFileChunk starts");
+    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
+    final List<String> types = params.get("type");
+    final List<String> qids = params.get("qid");
+    final List<String> taskIdList = params.get("ta");
+    final List<String> stageIds = params.get("sid");
+    final List<String> partIds = params.get("p");
+    final List<String> offsetList = params.get("offset");
+    final List<String> lengthList = params.get("length");
+
+    if (types == null || stageIds == null || qids == null || partIds == null) {
+      LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
+      return null;
+    }
+
+    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
+      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
+      return null;
+    }
+
+    String queryId = qids.get(0);
+    String shuffleType = types.get(0);
+    String sid = stageIds.get(0);
+    String partId = partIds.get(0);
+
+    if (shuffleType.equals("r") && taskIdList == null) {
+      LOG.error("Invalid URI - For range shuffle, taskId is required");
+      return null;
+    }
+    List<String> taskIds = splitMaps(taskIdList);
+
+    FileChunk chunk = null;
+    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+	+ ", taskIds=" + taskIdList);
+
+    // The working directory of Tajo worker for each query, including stage
+    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+    // If the stage requires a range shuffle
+    if (shuffleType.equals("r")) {
+      String ta = taskIds.get(0);
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+        LOG.warn("Range shuffle - file not exist");
+        return null;
+      }
+      Path path = executionBlockContext.getLocalFS().makeQualified(
+	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+      String startKey = params.get("start").get(0);
+      String endKey = params.get("end").get(0);
+      boolean last = params.get("final") != null;
+
+      try {
+        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
+            } catch (Throwable t) {
+        LOG.error("getFileChunks() throws exception");
+        return null;
+      }
+
+      // If the stage requires a hash shuffle or a scattered hash shuffle
+    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+        return null;
+      }
+      Path path = executionBlockContext.getLocalFS().makeQualified(
+        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+      File file = new File(path.toUri());
+      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+      if (startPos >= file.length()) {
+        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+        return null;
+      }
+      chunk = new FileChunk(file, startPos, readLen);
+
+    } else {
+      LOG.error("Unknown shuffle type");
+      return null;
+    }
+
+    return chunk;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<String>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  public static Path getTaskAttemptDir(TaskAttemptId quid) {
+    Path workDir =
+        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+            String.valueOf(quid.getTaskId().getId()),
+            String.valueOf(quid.getId()));
+    return workDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
index 20eec6b..e763d13 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
@@ -30,27 +30,23 @@ import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.storage.DiskUtil;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
-import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
-import org.apache.tajo.worker.event.NodeResourceManagerEvent;
-import org.apache.tajo.worker.event.NodeStatusEvent;
-
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tajo.worker.event.*;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 
-public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceManagerEvent> {
+public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceEvent> {
   private static final Log LOG = LogFactory.getLog(NodeResourceManager.class);
 
   private final Dispatcher dispatcher;
+  private final EventHandler taskEventHandler;
   private NodeResource totalResource;
   private NodeResource availableResource;
-  private AtomicInteger allocatedSize;
   private TajoConf tajoConf;
 
-  public NodeResourceManager(Dispatcher dispatcher){
+  public NodeResourceManager(Dispatcher dispatcher, EventHandler taskEventHandler) {
     super(NodeResourceManager.class.getName());
     this.dispatcher = dispatcher;
+    this.taskEventHandler = taskEventHandler;
   }
 
   @Override
@@ -61,14 +57,14 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     this.tajoConf = (TajoConf)conf;
     this.totalResource = createWorkerResource(tajoConf);
     this.availableResource = NodeResources.clone(totalResource);
-    this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this);
-    this.allocatedSize = new AtomicInteger();
+    this.dispatcher.register(NodeResourceEvent.EventType.class, this);
+
     super.serviceInit(conf);
     LOG.info("Initialized NodeResourceManager for " + totalResource);
   }
 
   @Override
-  public void handle(NodeResourceManagerEvent event) {
+  public void handle(NodeResourceEvent event) {
 
     if (event instanceof NodeResourceAllocateEvent) {
       NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event;
@@ -76,22 +72,27 @@ public class NodeResourceManager extends AbstractService implements EventHandler
       for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) {
         NodeResource resource = new NodeResource(request.getResource());
         if (allocate(resource)) {
-          allocatedSize.incrementAndGet();
-          //TODO send task event to taskExecutor
+          if(allocateEvent.getRequest().hasExecutionBlockRequest()){
+            //send ExecutionBlock start event to TaskManager
+            startExecutionBlock(allocateEvent.getRequest().getExecutionBlockRequest());
+          }
+
+          //send task start event to TaskExecutor
+          startTask(request.getTaskRequest(), resource);
         } else {
+          // reject the exceeded requests
           response.addCancellationTask(request);
         }
       }
       allocateEvent.getCallback().run(response.build());
 
     } else if (event instanceof NodeResourceDeallocateEvent) {
-      allocatedSize.decrementAndGet();
       NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event;
       release(deallocateEvent.getResource());
 
       // send current resource to ResourceTracker
       getDispatcher().getEventHandler().handle(
-          new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource()));
+          new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
     }
   }
 
@@ -107,10 +108,6 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     return availableResource;
   }
 
-  public int getAllocatedSize() {
-    return allocatedSize.get();
-  }
-
   private boolean allocate(NodeResource resource) {
     //TODO consider the jvm free memory
     if (NodeResources.fitsIn(resource, availableResource)) {
@@ -120,6 +117,14 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     return false;
   }
 
+  protected void startExecutionBlock(RunExecutionBlockRequestProto request) {
+    taskEventHandler.handle(new ExecutionBlockStartEvent(request));
+  }
+
+  protected void startTask(TaskRequestProto request, NodeResource resource) {
+    taskEventHandler.handle(new TaskStartEvent(request, resource));
+  }
+
   private void release(NodeResource resource) {
     NodeResources.addTo(availableResource, resource);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
index 84ac419..d13cd50 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -57,16 +57,16 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
   private volatile boolean isStopped;
   private volatile long heartBeatInterval;
   private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue;
-  private final WorkerConnectionInfo connectionInfo;
+  private final TajoWorker.WorkerContext workerContext;
   private final NodeResourceManager nodeResourceManager;
   private AsyncRpcClient rmClient;
   private ServiceTracker serviceTracker;
   private TajoResourceTrackerProtocolService.Interface resourceTracker;
   private int queueingLimit;
 
-  public NodeStatusUpdater(WorkerConnectionInfo connectionInfo, NodeResourceManager resourceManager) {
+  public NodeStatusUpdater(TajoWorker.WorkerContext workerContext, NodeResourceManager resourceManager) {
     super(NodeStatusUpdater.class.getSimpleName());
-    this.connectionInfo = connectionInfo;
+    this.workerContext = workerContext;
     this.nodeResourceManager = resourceManager;
   }
 
@@ -99,7 +99,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
     this.isStopped = true;
 
     synchronized (updaterThread) {
-      updaterThread.notifyAll();
+      updaterThread.interrupt();
+      updaterThread.join();
     }
     super.serviceStop();
     LOG.info("NodeStatusUpdater stopped.");
@@ -107,14 +108,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
 
   @Override
   public void handle(NodeStatusEvent event) {
-    switch (event.getType()) {
-      case REPORT_RESOURCE:
-        heartBeatRequestQueue.add(event); //batch report to ResourceTracker
-        break;
-      case FLUSH_REPORTS:
-        heartBeatRequestQueue.add(event); //flush report to ResourceTracker
-        break;
-    }
+    heartBeatRequestQueue.add(event);
   }
 
   public int getQueueSize() {
@@ -128,13 +122,13 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
   private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) {
     NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
     requestProto.setAvailableResource(resource.getProto());
-    requestProto.setWorkerId(connectionInfo.getId());
+    requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
     return requestProto.build();
   }
 
   private NodeHeartbeatRequestProto createHeartBeatReport() {
     NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
-    requestProto.setWorkerId(connectionInfo.getId());
+    requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
     return requestProto.build();
   }
 
@@ -142,8 +136,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
     NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
     requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto());
     requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto());
-    requestProto.setWorkerId(connectionInfo.getId());
-    requestProto.setConnectionInfo(connectionInfo.getProto());
+    requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
+    requestProto.setConnectionInfo(workerContext.getConnectionInfo().getProto());
 
     //TODO set node status to requestProto.setStatus()
     return requestProto.build();
@@ -231,8 +225,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
               }
 
               if (!events.isEmpty()) {
-                // send last available resource;
-                lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource()));
+                // send current available resource;
+                lastResponse = sendHeartbeat(createResourceReport(nodeResourceManager.getAvailableResource()));
               } else {
                 // send ping;
                 lastResponse = sendHeartbeat(createHeartBeatReport());
@@ -250,10 +244,10 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
           }
         } catch (NoSuchMethodException nsme) {
           LOG.fatal(nsme.getMessage(), nsme);
-          Runtime.getRuntime().halt(1);
+          Runtime.getRuntime().halt(-1);
         } catch (ClassNotFoundException cnfe) {
           LOG.fatal(cnfe.getMessage(), cnfe);
-          Runtime.getRuntime().halt(1);
+          Runtime.getRuntime().halt(-1);
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
           if (!isStopped) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 4f07ca6..fbd070e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -163,7 +163,7 @@ public class TajoWorker extends CompositeService {
 
     serviceTracker = ServiceTrackerFactory.get(systemConf);
 
-    this.workerContext = new WorkerContext();
+    this.workerContext = new TajoWorkerContext();
     this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
     String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
@@ -386,7 +386,45 @@ public class TajoWorker extends CompositeService {
     LOG.info("TajoWorker main thread exiting");
   }
 
-  public class WorkerContext {
+  public interface WorkerContext {
+    QueryMaster getQueryMaster();
+
+    TajoConf getConf();
+
+    ServiceTracker getServiceTracker();
+
+    QueryMasterManagerService getQueryMasterManagerService();
+
+    TaskRunnerManager getTaskRunnerManager();
+
+    CatalogService getCatalog();
+
+    WorkerConnectionInfo getConnectionInfo();
+
+    String getWorkerName();
+
+    LocalDirAllocator getLocalDirAllocator();
+
+    ClusterResourceSummary getClusterResource();
+
+    TajoSystemMetrics getWorkerSystemMetrics();
+
+    HashShuffleAppenderManager getHashShuffleAppenderManager();
+
+    HistoryWriter getTaskHistoryWriter();
+
+    HistoryReader getHistoryReader();
+
+    void cleanup(String strPath);
+
+    void cleanupTemporalDirectories();
+
+    void setClusterResource(ClusterResourceSummary clusterResource);
+
+    void setNumClusterNodes(int numClusterNodes);
+  }
+
+  class TajoWorkerContext implements WorkerContext {
     public QueryMaster getQueryMaster() {
       if (queryMasterManagerService == null) {
         return null;
@@ -430,7 +468,7 @@ public class TajoWorker extends CompositeService {
       return lDirAllocator;
     }
 
-    protected void cleanup(String strPath) {
+    public void cleanup(String strPath) {
       if (deletionService == null) return;
 
       LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -446,7 +484,7 @@ public class TajoWorker extends CompositeService {
       }
     }
 
-    protected void cleanupTemporalDirectories() {
+    public void cleanupTemporalDirectories() {
       if (deletionService == null) return;
 
       LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -627,6 +665,7 @@ public class TajoWorker extends CompositeService {
   }
 
   public static void main(String[] args) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
 
     TajoConf tajoConf = new TajoConf();

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index bbf8564..de8afe8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -116,14 +116,7 @@ public class TajoWorkerManagerService extends CompositeService
     workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
 
     try {
-      workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(
-          new WorkerConnectionInfo(request.getQueryMaster())
-          , new ExecutionBlockId(request.getExecutionBlockId())
-          , request.getContainerId()
-          , new QueryContext(workerContext.getConf(), request.getQueryContext()),
-          request.getPlanJson(),
-          request.getShuffleType()
-      ));
+      workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(request));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);


[2/3] tajo git commit: TAJO-1615: Implement TaskManager. (jinho)

Posted by jh...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 4716dcc..c849940 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -18,839 +18,35 @@
 
 package org.apache.tajo.worker;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.function.python.TajoScriptEngine;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+public interface Task {
 
-public class Task {
-  private static final Log LOG = LogFactory.getLog(Task.class);
-  private static final float FETCHER_PROGRESS = 0.5f;
+  void init() throws IOException;
 
-  private final TajoConf systemConf;
-  private final QueryContext queryContext;
-  private final ExecutionBlockContext executionBlockContext;
-  private final TaskAttemptId taskId;
-  private final String taskRunnerId;
+  void fetch();
 
-  private final Path taskDir;
-  private final TaskRequest request;
-  private TaskAttemptContext context;
-  private List<Fetcher> fetcherRunners;
-  private LogicalNode plan;
-  private final Map<String, TableDesc> descs = Maps.newHashMap();
-  private PhysicalExec executor;
-  private boolean interQuery;
-  private Path inputTableBaseDir;
+  void run() throws Exception;
 
-  private long startTime;
-  private long finishTime;
+  void kill();
 
-  private final TableStats inputStats;
-  private List<FileChunk> localChunks;
+  void abort();
 
-  // TODO - to be refactored
-  private ShuffleType shuffleType = null;
-  private Schema finalSchema = null;
-  private TupleComparator sortComp = null;
+  void cleanup();
 
-  public Task(String taskRunnerId,
-              Path baseDir,
-              TaskAttemptId taskId,
-              final ExecutionBlockContext executionBlockContext,
-              final TaskRequest request) throws IOException {
-    this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
-  }
+  boolean hasFetchPhase();
 
-  public Task(String taskRunnerId,
-              Path baseDir,
-              TaskAttemptId taskId,
-              TajoConf conf,
-              final ExecutionBlockContext executionBlockContext,
-              final TaskRequest request) throws IOException {
-    this.taskRunnerId = taskRunnerId;
-    this.request = request;
-    this.taskId = taskId;
+  boolean isProgressChanged();
 
-    this.systemConf = conf;
-    this.queryContext = request.getQueryContext(systemConf);
-    this.executionBlockContext = executionBlockContext;
-    this.taskDir = StorageUtil.concatPath(baseDir,
-        taskId.getTaskId().getId() + "_" + taskId.getId());
+  boolean isStopped();
 
-    this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
-        request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
-    this.context.setDataChannel(request.getDataChannel());
-    this.context.setEnforcer(request.getEnforcer());
-    this.context.setState(TaskAttemptState.TA_PENDING);
-    this.inputStats = new TableStats();
-    this.fetcherRunners = Lists.newArrayList();
-  }
+  void updateProgress();
 
-  public void initPlan() throws IOException {
-    plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
-    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
-    if (scanNode != null) {
-      for (LogicalNode node : scanNode) {
-        ScanNode scan = (ScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
+  TaskAttemptContext getTaskContext();
 
-    LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
-    if (partitionScanNode != null) {
-      for (LogicalNode node : partitionScanNode) {
-        PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
+  ExecutionBlockContext getExecutionBlockContext();
 
-    interQuery = request.getProto().getInterQuery();
-    if (interQuery) {
-      context.setInterQuery();
-      this.shuffleType = context.getDataChannel().getShuffleType();
-
-      if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-        SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
-        this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
-        this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
-      }
-    } else {
-      Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
-          .getAppenderFilePath(taskId, queryContext.getStagingDir());
-      LOG.info("Output File Path: " + outFilePath);
-      context.setOutputPath(outFilePath);
-    }
-
-    this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
-    LOG.info("==================================");
-    LOG.info("* Stage " + request.getId() + " is initialized");
-    LOG.info("* InterQuery: " + interQuery
-        + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
-        ", Fragments (num: " + request.getFragments().size() + ")" +
-        ", Fetches (total:" + request.getFetches().size() + ") :");
-
-    if(LOG.isDebugEnabled()) {
-      for (FetchImpl f : request.getFetches()) {
-        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
-      }
-    }
-    LOG.info("* Local task dir: " + taskDir);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("* plan:\n");
-      LOG.debug(plan.toString());
-    }
-    LOG.info("==================================");
-  }
-
-  private void startScriptExecutors() throws IOException {
-    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
-      executor.start(systemConf);
-    }
-  }
-
-  private void stopScriptExecutors() {
-    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
-      executor.shutdown();
-    }
-  }
-
-  public void init() throws IOException {
-    initPlan();
-    startScriptExecutors();
-
-    if (context.getState() == TaskAttemptState.TA_PENDING) {
-      // initialize a task temporal dir
-      FileSystem localFS = executionBlockContext.getLocalFS();
-      localFS.mkdirs(taskDir);
-
-      if (request.getFetches().size() > 0) {
-        inputTableBaseDir = localFS.makeQualified(
-            executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
-                getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
-        localFS.mkdirs(inputTableBaseDir);
-        Path tableDir;
-        for (String inputTable : context.getInputTables()) {
-          tableDir = new Path(inputTableBaseDir, inputTable);
-          if (!localFS.exists(tableDir)) {
-            LOG.info("the directory is created  " + tableDir.toUri());
-            localFS.mkdirs(tableDir);
-          }
-        }
-      }
-      // for localizing the intermediate data
-      fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
-    }
-  }
-
-  public TaskAttemptId getTaskId() {
-    return taskId;
-  }
-
-  public TaskAttemptId getId() {
-    return context.getTaskId();
-  }
-
-  public TaskAttemptState getStatus() {
-    return context.getState();
-  }
-
-  public String toString() {
-    return "queryId: " + this.getId() + " status: " + this.getStatus();
-  }
-
-  public void setState(TaskAttemptState status) {
-    context.setState(status);
-  }
-
-  public TaskAttemptContext getContext() {
-    return context;
-  }
-
-  public boolean hasFetchPhase() {
-    return fetcherRunners.size() > 0;
-  }
-
-  public List<Fetcher> getFetchers() {
-    return new ArrayList<Fetcher>(fetcherRunners);
-  }
-
-  public void fetch() {
-    ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
-    for (Fetcher f : fetcherRunners) {
-      executorService.submit(new FetchRunner(context, f));
-    }
-  }
-
-  public void kill() {
-    stopScriptExecutors();
-    context.setState(TaskAttemptState.TA_KILLED);
-    context.stop();
-  }
-
-  public void abort() {
-    stopScriptExecutors();
-    context.stop();
-  }
-
-  public void cleanUp() {
-    // remove itself from worker
-    if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
-      synchronized (executionBlockContext.getTasks()) {
-        executionBlockContext.getTasks().remove(this.getId());
-      }
-    } else {
-      LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState());
-    }
-  }
-
-  public TaskStatusProto getReport() {
-    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
-    builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
-    builder.setId(context.getTaskId().getProto())
-        .setProgress(context.getProgress())
-        .setState(context.getState());
-
-    builder.setInputStats(reloadInputStats());
-
-    if (context.getResultStats() != null) {
-      builder.setResultStats(context.getResultStats().getProto());
-    }
-    return builder.build();
-  }
-
-  public boolean isRunning(){
-    return context.getState() == TaskAttemptState.TA_RUNNING;
-  }
-
-  public boolean isProgressChanged() {
-    return context.isProgressChanged();
-  }
-
-  public void updateProgress() {
-    if(context != null && context.isStopped()){
-      return;
-    }
-
-    if (executor != null && context.getProgress() < 1.0f) {
-      context.setExecutorProgress(executor.getProgress());
-    }
-  }
-
-  private CatalogProtos.TableStatsProto reloadInputStats() {
-    synchronized(inputStats) {
-      if (this.executor == null) {
-        return inputStats.getProto();
-      }
-
-      TableStats executorInputStats = this.executor.getInputStats();
-
-      if (executorInputStats != null) {
-        inputStats.setValues(executorInputStats);
-      }
-      return inputStats.getProto();
-    }
-  }
-
-  private TaskCompletionReport getTaskCompletionReport() {
-    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
-    builder.setId(context.getTaskId().getProto());
-
-    builder.setInputStats(reloadInputStats());
-
-    if (context.hasResultStats()) {
-      builder.setResultStats(context.getResultStats().getProto());
-    } else {
-      builder.setResultStats(new TableStats().getProto());
-    }
-
-    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
-    if (it.hasNext()) {
-      do {
-        Entry<Integer, String> entry = it.next();
-        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
-        part.setPartId(entry.getKey());
-
-        // Set output volume
-        if (context.getPartitionOutputVolume() != null) {
-          for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
-            if (entry.getKey().equals(e.getKey())) {
-              part.setVolume(e.getValue().longValue());
-              break;
-            }
-          }
-        }
-
-        builder.addShuffleFileOutputs(part.build());
-      } while (it.hasNext());
-    }
-
-    return builder.build();
-  }
-
-  private void waitForFetch() throws InterruptedException, IOException {
-    context.getFetchLatch().await();
-    LOG.info(context.getTaskId() + " All fetches are done!");
-    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
-
-    // Get all broadcasted tables
-    Set<String> broadcastTableNames = new HashSet<String>();
-    List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
-    if (broadcasts != null) {
-      for (EnforceProperty eachBroadcast : broadcasts) {
-        broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
-      }
-    }
-
-    // localize the fetched data and skip the broadcast table
-    for (String inputTable: inputs) {
-      if (broadcastTableNames.contains(inputTable)) {
-        continue;
-      }
-      File tableDir = new File(context.getFetchIn(), inputTable);
-      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
-      context.updateAssignedFragments(inputTable, frags);
-    }
-  }
-
-  public void run() throws Exception {
-    startTime = System.currentTimeMillis();
-    Throwable error = null;
-    try {
-      if(!context.isStopped()) {
-        context.setState(TaskAttemptState.TA_RUNNING);
-        if (context.hasFetchPhase()) {
-          // If the fetch is still in progress, the query unit must wait for
-          // complete.
-          waitForFetch();
-          context.setFetcherProgress(FETCHER_PROGRESS);
-          context.setProgressChanged(true);
-          updateProgress();
-        }
-
-        this.executor = executionBlockContext.getTQueryEngine().
-            createPlan(context, plan);
-        this.executor.init();
-
-        while(!context.isStopped() && executor.next() != null) {
-        }
-      }
-    } catch (Throwable e) {
-      error = e ;
-      LOG.error(e.getMessage(), e);
-      stopScriptExecutors();
-      context.stop();
-    } finally {
-      if (executor != null) {
-        try {
-          executor.close();
-          reloadInputStats();
-        } catch (IOException e) {
-          LOG.error(e, e);
-        }
-        this.executor = null;
-      }
-
-      executionBlockContext.completedTasksNum.incrementAndGet();
-      context.getHashShuffleAppenderManager().finalizeTask(taskId);
-
-      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
-      if (context.isStopped()) {
-        context.setExecutorProgress(0.0f);
-
-        if (context.getState() == TaskAttemptState.TA_KILLED) {
-          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
-          executionBlockContext.killedTasksNum.incrementAndGet();
-        } else {
-          context.setState(TaskAttemptState.TA_FAILED);
-          TaskFatalErrorReport.Builder errorBuilder =
-              TaskFatalErrorReport.newBuilder()
-                  .setId(getId().getProto());
-          if (error != null) {
-            if (error.getMessage() == null) {
-              errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
-            } else {
-              errorBuilder.setErrorMessage(error.getMessage());
-            }
-            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
-          }
-
-          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
-          executionBlockContext.failedTasksNum.incrementAndGet();
-        }
-      } else {
-        // if successful
-        context.setProgress(1.0f);
-        context.setState(TaskAttemptState.TA_SUCCEEDED);
-        executionBlockContext.succeededTasksNum.incrementAndGet();
-
-        TaskCompletionReport report = getTaskCompletionReport();
-        queryMasterStub.done(null, report, NullCallback.get());
-      }
-      finishTime = System.currentTimeMillis();
-      LOG.info(context.getTaskId() + " completed. " +
-          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
-          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
-          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
-          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
-      cleanupTask();
-    }
-  }
-
-  public void cleanupTask() {
-    TaskHistory taskHistory = createTaskHistory();
-    executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
-    executionBlockContext.getTasks().remove(getId());
-
-    fetcherRunners.clear();
-    fetcherRunners = null;
-    try {
-      if(executor != null) {
-        executor.close();
-        executor = null;
-      }
-    } catch (IOException e) {
-      LOG.fatal(e.getMessage(), e);
-    }
-
-    executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
-    stopScriptExecutors();
-  }
-
-  public TaskHistory createTaskHistory() {
-    TaskHistory taskHistory = null;
-    try {
-      taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(),
-          startTime, finishTime, reloadInputStats());
-
-      if (context.getOutputPath() != null) {
-        taskHistory.setOutputPath(context.getOutputPath().toString());
-      }
-
-      if (context.getWorkDir() != null) {
-        taskHistory.setWorkingPath(context.getWorkDir().toString());
-      }
-
-      if (context.getResultStats() != null) {
-        taskHistory.setOutputStats(context.getResultStats().getProto());
-      }
-
-      if (hasFetchPhase()) {
-        taskHistory.setTotalFetchCount(fetcherRunners.size());
-        int i = 0;
-        FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
-        for (Fetcher fetcher : fetcherRunners) {
-          // TODO store the fetcher histories
-          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
-            builder.setStartTime(fetcher.getStartTime());
-            builder.setFinishTime(fetcher.getFinishTime());
-            builder.setFileLength(fetcher.getFileLen());
-            builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
-            builder.setState(fetcher.getState());
-
-            taskHistory.addFetcherHistory(builder.build());
-          }
-          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
-        }
-        taskHistory.setFinishedFetchCount(i);
-      }
-    } catch (Exception e) {
-      LOG.warn(e.getMessage(), e);
-    }
-
-    return taskHistory;
-  }
-
-  public int hashCode() {
-    return context.hashCode();
-  }
-
-  public boolean equals(Object obj) {
-    if (obj instanceof Task) {
-      Task other = (Task) obj;
-      return this.context.equals(other.context);
-    }
-    return false;
-  }
-
-  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
-      throws IOException {
-    Configuration c = new Configuration(systemConf);
-    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
-    FileSystem fs = FileSystem.get(c);
-    Path tablePath = new Path(file.getAbsolutePath());
-
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus f : fileLists) {
-      if (f.getLen() == 0) {
-        continue;
-      }
-      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
-      listTablets.add(tablet);
-    }
-
-    // Special treatment for locally pseudo fetched chunks
-    synchronized (localChunks) {
-      for (FileChunk chunk : localChunks) {
-        if (name.equals(chunk.getEbId())) {
-          tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
-          listTablets.add(tablet);
-          LOG.info("One local chunk is added to listTablets");
-        }
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  private class FetchRunner implements Runnable {
-    private final TaskAttemptContext ctx;
-    private final Fetcher fetcher;
-    private int maxRetryNum;
-
-    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
-      this.ctx = ctx;
-      this.fetcher = fetcher;
-      this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
-    }
-
-    @Override
-    public void run() {
-      int retryNum = 0;
-      int retryWaitTime = 1000; //sec
-
-      try { // for releasing fetch latch
-        while(!context.isStopped() && retryNum < maxRetryNum) {
-          if (retryNum > 0) {
-            try {
-              Thread.sleep(retryWaitTime);
-              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // max 10 seconds
-            } catch (InterruptedException e) {
-              LOG.error(e);
-            }
-            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
-          }
-          try {
-            FileChunk fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
-          && fetched.getFile() != null) {
-              if (fetched.fromRemote() == false) {
-          localChunks.add(fetched);
-          LOG.info("Add a new FileChunk to local chunk list");
-              }
-              break;
-            }
-          } catch (Throwable e) {
-            LOG.error("Fetch failed: " + fetcher.getURI(), e);
-          }
-          retryNum++;
-        }
-      } finally {
-        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
-          fetcherFinished(ctx);
-        } else {
-          if (retryNum == maxRetryNum) {
-            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
-          }
-          stopScriptExecutors();
-          context.stop(); // retry task
-          ctx.getFetchLatch().countDown();
-        }
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
-    if (totalFetcher > 0) {
-      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
-    } else {
-      return 0.0f;
-    }
-  }
-
-  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
-    int fetcherSize = fetcherRunners.size();
-    if(fetcherSize == 0) {
-      return;
-    }
-
-    ctx.getFetchLatch().countDown();
-
-    int remainFetcher = (int) ctx.getFetchLatch().getCount();
-    if (remainFetcher == 0) {
-      context.setFetcherProgress(FETCHER_PROGRESS);
-    } else {
-      context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
-      context.setProgressChanged(true);
-    }
-  }
-
-  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                        List<FetchImpl> fetches) throws IOException {
-
-    if (fetches.size() > 0) {
-      Path inputDir = executionBlockContext.getLocalDirAllocator().
-          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
-
-      int i = 0;
-      File storeDir;
-      File defaultStoreFile;
-      FileChunk storeChunk = null;
-      List<Fetcher> runnerList = Lists.newArrayList();
-
-      for (FetchImpl f : fetches) {
-        storeDir = new File(inputDir.toString(), f.getName());
-        if (!storeDir.exists()) {
-          storeDir.mkdirs();
-        }
-
-        for (URI uri : f.getURIs()) {
-          defaultStoreFile = new File(storeDir, "in_" + i);
-          InetAddress address = InetAddress.getByName(uri.getHost());
-
-          WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
-          if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
-            boolean hasError = false;
-            try {
-              LOG.info("Try to get local file chunk at local host");
-              storeChunk = getLocalStoredFileChunk(uri, systemConf);
-            } catch (Throwable t) {
-              hasError = true;
-            }
-
-            // When a range request is out of range, storeChunk will be NULL. This case is normal state.
-            // So, we should skip and don't need to create storeChunk.
-            if (storeChunk == null && !hasError) {
-              continue;
-            }
-
-            if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
-                && hasError == false) {
-              storeChunk.setFromRemote(false);
-            } else {
-              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-              storeChunk.setFromRemote(true);
-            }
-          } else {
-            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-            storeChunk.setFromRemote(true);
-          }
-
-          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
-          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
-          storeChunk.setEbId(f.getName());
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
-          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
-          runnerList.add(fetcher);
-          i++;
-        }
-      }
-      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
-      return runnerList;
-    } else {
-      return Lists.newArrayList();
-    }
-  }
-
-  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
-    // Parse the URI
-    LOG.info("getLocalStoredFileChunk starts");
-    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
-    final List<String> types = params.get("type");
-    final List<String> qids = params.get("qid");
-    final List<String> taskIdList = params.get("ta");
-    final List<String> stageIds = params.get("sid");
-    final List<String> partIds = params.get("p");
-    final List<String> offsetList = params.get("offset");
-    final List<String> lengthList = params.get("length");
-
-    if (types == null || stageIds == null || qids == null || partIds == null) {
-      LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
-      return null;
-    }
-
-    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
-      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
-      return null;
-    }
-
-    String queryId = qids.get(0);
-    String shuffleType = types.get(0);
-    String sid = stageIds.get(0);
-    String partId = partIds.get(0);
-
-    if (shuffleType.equals("r") && taskIdList == null) {
-      LOG.error("Invalid URI - For range shuffle, taskId is required");
-      return null;
-    }
-    List<String> taskIds = splitMaps(taskIdList);
-
-    FileChunk chunk = null;
-    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
-    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
-	+ ", taskIds=" + taskIdList);
-
-    // The working directory of Tajo worker for each query, including stage
-    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
-
-    // If the stage requires a range shuffle
-    if (shuffleType.equals("r")) {
-      String ta = taskIds.get(0);
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
-        LOG.warn("Range shuffle - file not exist");
-        return null;
-      }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
-      String startKey = params.get("start").get(0);
-      String endKey = params.get("end").get(0);
-      boolean last = params.get("final") != null;
-
-      try {
-        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
-            } catch (Throwable t) {
-        LOG.error("getFileChunks() throws exception");
-        return null;
-      }
-
-      // If the stage requires a hash shuffle or a scattered hash shuffle
-    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
-      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
-        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
-        return null;
-      }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
-      File file = new File(path.toUri());
-      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
-      if (startPos >= file.length()) {
-        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
-        return null;
-      }
-      chunk = new FileChunk(file, startPos, readLen);
-
-    } else {
-      LOG.error("Unknown shuffle type");
-      return null;
-    }
-
-    return chunk;
-  }
-
-  private List<String> splitMaps(List<String> mapq) {
-    if (null == mapq) {
-      return null;
-    }
-    final List<String> ret = new ArrayList<String>();
-    for (String s : mapq) {
-      Collections.addAll(ret, s.split(","));
-    }
-    return ret;
-  }
-
-  public static Path getTaskAttemptDir(TaskAttemptId quid) {
-    Path workDir =
-        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
-            String.valueOf(quid.getTaskId().getId()),
-            String.valueOf(quid.getId()));
-    return workDir;
-  }
+  TajoWorkerProtocol.TaskStatusProto getReport();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 58028ac..d020639 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -46,7 +46,6 @@ import java.io.IOException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
@@ -60,13 +59,13 @@ public class TaskAttemptContext {
 
   private volatile TaskAttemptState state;
   private TableStats resultStats;
-  private TaskAttemptId queryId;
+  private TaskAttemptId taskId;
   private final Path workDir;
   private boolean needFetch = false;
   private CountDownLatch doneFetchPhaseSignal;
   private float progress = 0.0f;
   private float fetcherProgress = 0.0f;
-  private AtomicBoolean progressChanged = new AtomicBoolean(false);
+  private volatile boolean progressChanged;
 
   /** a map of shuffled file outputs */
   private Map<Integer, String> shuffleFileOutputs;
@@ -87,7 +86,7 @@ public class TaskAttemptContext {
   private EvalContext evalContext = new EvalContext();
 
   public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext,
-                            final TaskAttemptId queryId,
+                            final TaskAttemptId taskId,
                             final FragmentProto[] fragments,
                             final Path workDir) {
     this.queryContext = queryContext;
@@ -97,7 +96,7 @@ public class TaskAttemptContext {
       this.sharedResource = executionBlockContext.getSharedResource();
     }
 
-    this.queryId = queryId;
+    this.taskId = taskId;
 
     if (fragments != null) {
       for (FragmentProto t : fragments) {
@@ -114,25 +113,15 @@ public class TaskAttemptContext {
     this.workDir = workDir;
     this.shuffleFileOutputs = Maps.newHashMap();
 
-    state = TaskAttemptState.TA_PENDING;
+    this.state = TaskAttemptState.TA_PENDING;
 
     this.partitionOutputVolume = Maps.newHashMap();
-
-    if (workerContext != null) {
-      this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager();
-    } else {
-      try {
-        this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf());
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
   }
 
   @VisibleForTesting
-  public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId queryId,
+  public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId taskAttemptId,
                             final Fragment [] fragments,  final Path workDir) {
-    this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+    this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
   }
 
   public TajoConf getConf() {
@@ -308,9 +297,10 @@ public class TaskAttemptContext {
   public Path getWorkDir() {
     return this.workDir;
   }
-  
+
+  //TODO change to getTaskAttemptId()
   public TaskAttemptId getTaskId() {
-    return this.queryId;
+    return this.taskId;
   }
   
   public float getProgress() {
@@ -326,17 +316,11 @@ public class TaskAttemptContext {
       this.progress = progress;
     }
 
-    if (previousProgress != progress) {
-      setProgressChanged(true);
-    }
+    this.progressChanged = previousProgress != progress;
   }
 
   public boolean isProgressChanged() {
-    return progressChanged.get();
-  }
-
-  public void setProgressChanged(boolean changed){
-    progressChanged.set(changed);
+    return progressChanged;
   }
 
   public void setExecutorProgress(float executorProgress) {
@@ -355,7 +339,9 @@ public class TaskAttemptContext {
     if(Float.isNaN(fetcherProgress) || Float.isInfinite(fetcherProgress)){
       fetcherProgress = 0.0f;
     }
+    float previousProgress = this.fetcherProgress;
     this.fetcherProgress = fetcherProgress;
+    this.progressChanged = previousProgress != fetcherProgress;
   }
 
   public FragmentProto getTable(String id) {
@@ -383,13 +369,13 @@ public class TaskAttemptContext {
   }
   
   public int hashCode() {
-    return Objects.hashCode(queryId);
+    return Objects.hashCode(taskId);
   }
   
   public boolean equals(Object obj) {
     if (obj instanceof TaskAttemptContext) {
       TaskAttemptContext other = (TaskAttemptContext) obj;
-      return queryId.equals(other.getTaskId());
+      return taskId.equals(other.getTaskId());
     } else {
       return false;
     }
@@ -399,11 +385,18 @@ public class TaskAttemptContext {
     return queryContext;
   }
 
-  public TaskAttemptId getQueryId() {
-    return queryId;
-  }
-
   public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+    if(hashShuffleAppenderManager == null) {
+      if (workerContext != null) {
+        this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager();
+      } else {
+        try {
+          this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf());
+        } catch (IOException e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }
     return hashShuffleAppenderManager;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
new file mode 100644
index 0000000..2576726
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.TajoProtos;
+
+/**
+ * The driver class for Tajo Task processing.
+ */
+public class TaskContainer implements Runnable {
+  private static final Log LOG = LogFactory.getLog(TaskContainer.class);
+
+  private final TaskExecutor executor;
+  private final int sequenceId;
+
+  public TaskContainer(int sequenceId, TaskExecutor executor) {
+    this.sequenceId = sequenceId;
+    this.executor = executor;
+  }
+
+  @Override
+  public void run() {
+    while (!executor.isStopped()) {
+
+      Task task = null;
+      try {
+        task = executor.getNextTask();
+
+        task.getExecutionBlockContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sequenceId + TaskContainer.class.getSimpleName() +
+              " got task:" + task.getTaskContext().getTaskId());
+        }
+
+        TaskAttemptContext taskAttemptContext = task.getTaskContext();
+        if (taskAttemptContext.isStopped()) return;
+
+        task.init();
+
+        if (task.hasFetchPhase()) {
+          task.fetch(); // The fetch is performed in an asynchronous way.
+        }
+
+        if (!taskAttemptContext.isStopped()) {
+          task.run();
+        }
+
+        task.cleanup();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        if (task != null) {
+          try {
+            task.abort();
+            task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e.getMessage());
+          } catch (Throwable t) {
+            LOG.fatal(t.getMessage(), t);
+          }
+        }
+      } finally {
+        if (task != null) {
+          executor.stopTask(task.getTaskContext().getTaskId());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
new file mode 100644
index 0000000..299952e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.worker.event.*;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * TaskExecutor uses a number of threads equal to the number of slots available for running tasks on the Worker
+ */
+public class TaskExecutor extends AbstractService implements EventHandler<TaskExecutorEvent> {
+  private static final Log LOG = LogFactory.getLog(TaskExecutor.class);
+
+  private final TaskManager taskManager;
+  private final EventHandler rmEventHandler;
+  private final Map<TaskAttemptId, NodeResource> allocatedResourceMap;
+  private final BlockingQueue<Task> taskQueue;
+  private final AtomicInteger runningTasks;
+  private ThreadPoolExecutor fetcherExecutor;
+  private ExecutorService threadPool;
+  private TajoConf tajoConf;
+  private volatile boolean isStopped;
+
+  public TaskExecutor(TaskManager taskManager, EventHandler rmEventHandler) {
+    super(TaskExecutor.class.getName());
+    this.taskManager = taskManager;
+    this.rmEventHandler = rmEventHandler;
+    this.allocatedResourceMap = Maps.newConcurrentMap();
+    this.runningTasks = new AtomicInteger();
+    this.taskQueue = new LinkedBlockingQueue<Task>();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
+
+    this.tajoConf = (TajoConf) conf;
+    this.taskManager.getDispatcher().register(TaskExecutorEvent.EventType.class, this);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    int nThreads = this.tajoConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+    this.threadPool = Executors.newFixedThreadPool(nThreads,
+        new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build());
+
+    //TODO move to tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
+    int maxFetcherThreads = Runtime.getRuntime().availableProcessors() * 2;
+    this.fetcherExecutor = new ThreadPoolExecutor(Math.min(nThreads, maxFetcherThreads),
+        maxFetcherThreads,
+        60L, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(true));
+
+
+    for (int i = 0; i < nThreads; i++) {
+      threadPool.submit(new TaskContainer(i, this));
+    }
+
+    super.serviceStart();
+    LOG.info("Started TaskExecutor[" + nThreads + "], Fetcher executor[" + maxFetcherThreads + "]");
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    isStopped = true;
+
+    threadPool.shutdown();
+    fetcherExecutor.shutdown();
+    super.serviceStop();
+  }
+
+  public boolean isStopped() {
+    return isStopped;
+  }
+
+  public int getRunningTasks() {
+    return runningTasks.get();
+  }
+
+  /**
+   * This will block until a task is available.
+   */
+  protected Task getNextTask() {
+    Task task = null;
+    try {
+      task = taskQueue.take();
+    } catch (InterruptedException e) {
+      LOG.fatal(e);
+    }
+    return task;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void stopTask(TaskAttemptId taskId) {
+    runningTasks.decrementAndGet();
+    rmEventHandler.handle(new NodeResourceDeallocateEvent(allocatedResourceMap.remove(taskId)));
+  }
+
+  protected ExecutorService getFetcherExecutor() {
+    return fetcherExecutor;
+  }
+
+
+  protected Task createTask(ExecutionBlockContext executionBlockContext,
+                            TajoWorkerProtocol.TaskRequestProto taskRequest) throws IOException {
+    Task task = null;
+    TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+    if (executionBlockContext.getTasks().containsKey(taskAttemptId)) {
+      String errorMessage = "Duplicate Task Attempt: " + taskAttemptId;
+      LOG.error(errorMessage);
+      executionBlockContext.fatalError(taskAttemptId, errorMessage);
+    } else {
+      task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext, getFetcherExecutor());
+      executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), task);
+    }
+    return task;
+  }
+
+  @Override
+  public void handle(TaskExecutorEvent event) {
+
+    if (event instanceof TaskStartEvent) {
+      TaskStartEvent startEvent = (TaskStartEvent) event;
+      allocatedResourceMap.put(startEvent.getTaskId(), startEvent.getAllocatedResource());
+
+      ExecutionBlockContext context = taskManager.getExecutionBlockContext(
+          startEvent.getTaskId().getTaskId().getExecutionBlockId());
+
+      try {
+        Task task = createTask(context, startEvent.getTaskRequest());
+        if (task != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() +
+                ", allocated resource: " + startEvent.getAllocatedResource());
+          }
+          taskQueue.put(task);
+          runningTasks.incrementAndGet();
+          context.getWorkerContext().getWorkerSystemMetrics()
+              .histogram("tasks", "running").update(runningTasks.get());
+        } else {
+          LOG.warn("Release duplicate task resource: " + startEvent.getAllocatedResource());
+          stopTask(startEvent.getTaskId());
+        }
+      } catch (InterruptedException e) {
+        if (!isStopped) {
+          LOG.fatal(e.getMessage(), e);
+        }
+      } catch (IOException e) {
+        stopTask(startEvent.getTaskId());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
new file mode 100644
index 0000000..be3960b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -0,0 +1,838 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+
+public class TaskImpl implements Task {
+  private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+  private static final float FETCHER_PROGRESS = 0.5f;
+
+  private final TajoConf systemConf;
+  private final QueryContext queryContext;
+  private final ExecutionBlockContext executionBlockContext;
+  private final TaskRequest request;
+  private final Map<String, TableDesc> descs;
+  private final TableStats inputStats;
+  private final ExecutorService fetcherExecutor;
+  private final Path taskDir;
+
+  private final TaskAttemptContext context;
+  private List<Fetcher> fetcherRunners;
+  private LogicalNode plan;
+  private PhysicalExec executor;
+
+  private boolean interQuery;
+  private Path inputTableBaseDir;
+
+  private long startTime;
+  private long endTime;
+
+  private List<FileChunk> localChunks;
+  // TODO - to be refactored
+  private ShuffleType shuffleType = null;
+  private Schema finalSchema = null;
+
+  private TupleComparator sortComp = null;
+
+  public TaskImpl(final TaskRequest request,
+                  final ExecutionBlockContext executionBlockContext,
+                  final ExecutorService fetcherExecutor) throws IOException {
+
+    this.request = request;
+    this.executionBlockContext = executionBlockContext;
+    this.systemConf = executionBlockContext.getConf();
+    this.queryContext = request.getQueryContext(systemConf);
+    this.inputStats = new TableStats();
+    this.fetcherRunners = Lists.newArrayList();
+    this.fetcherExecutor = fetcherExecutor;
+    this.descs = Maps.newHashMap();
+
+    Path baseDirPath = executionBlockContext.createBaseDir();
+    LOG.info("Task basedir is created (" + baseDirPath +")");
+    TaskAttemptId taskAttemptId = request.getId();
+
+    this.taskDir = StorageUtil.concatPath(baseDirPath,
+        taskAttemptId.getTaskId().getId() + "_" + taskAttemptId.getId());
+    this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskAttemptId,
+        request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
+    this.context.setDataChannel(request.getDataChannel());
+    this.context.setEnforcer(request.getEnforcer());
+    this.context.setState(TaskAttemptState.TA_PENDING);
+  }
+
+  public void initPlan() throws IOException {
+    plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
+    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+    if (scanNode != null) {
+      for (LogicalNode node : scanNode) {
+        ScanNode scan = (ScanNode) node;
+        descs.put(scan.getCanonicalName(), scan.getTableDesc());
+      }
+    }
+
+    LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
+    if (partitionScanNode != null) {
+      for (LogicalNode node : partitionScanNode) {
+        PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
+        descs.put(scan.getCanonicalName(), scan.getTableDesc());
+      }
+    }
+
+    interQuery = request.getProto().getInterQuery();
+    if (interQuery) {
+      context.setInterQuery();
+      this.shuffleType = context.getDataChannel().getShuffleType();
+
+      if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
+        SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+        this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+        this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
+      }
+    } else {
+      Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+          .getAppenderFilePath(getId(), queryContext.getStagingDir());
+      LOG.info("Output File Path: " + outFilePath);
+      context.setOutputPath(outFilePath);
+    }
+
+    this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+    LOG.info("==================================");
+    LOG.info("* Stage " + request.getId() + " is initialized");
+    LOG.info("* InterQuery: " + interQuery
+        + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
+        ", Fragments (num: " + request.getFragments().size() + ")" +
+        ", Fetches (total:" + request.getFetches().size() + ") :");
+
+    if(LOG.isDebugEnabled()) {
+      for (FetchImpl f : request.getFetches()) {
+        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+      }
+    }
+    LOG.info("* Local task dir: " + taskDir);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("* plan:\n");
+      LOG.debug(plan.toString());
+    }
+    LOG.info("==================================");
+  }
+
+  private void startScriptExecutors() throws IOException {
+    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+      executor.start(systemConf);
+    }
+  }
+
+  private void stopScriptExecutors() {
+    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
+      executor.shutdown();
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+    LOG.info("Initializing: " + getId());
+
+    initPlan();
+    startScriptExecutors();
+
+    if (context.getState() == TaskAttemptState.TA_PENDING) {
+      // initialize a task temporal dir
+      FileSystem localFS = executionBlockContext.getLocalFS();
+      localFS.mkdirs(taskDir);
+
+      if (request.getFetches().size() > 0) {
+        inputTableBaseDir = localFS.makeQualified(
+            executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
+                getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+        localFS.mkdirs(inputTableBaseDir);
+        Path tableDir;
+        for (String inputTable : context.getInputTables()) {
+          tableDir = new Path(inputTableBaseDir, inputTable);
+          if (!localFS.exists(tableDir)) {
+            LOG.info("the directory is created  " + tableDir.toUri());
+            localFS.mkdirs(tableDir);
+          }
+        }
+      }
+      // for localizing the intermediate data
+      fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
+    }
+  }
+
+  private TaskAttemptId getId() {
+    return context.getTaskId();
+  }
+
+  public String toString() {
+    return "TaskId: " + this.getId() + " Status: " + context.getState();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return context.isStopped();
+  }
+
+  @Override
+  public TaskAttemptContext getTaskContext() {
+    return context;
+  }
+
+  @Override
+  public ExecutionBlockContext getExecutionBlockContext() {
+    return executionBlockContext;
+  }
+
+  @Override
+  public boolean hasFetchPhase() {
+    return fetcherRunners.size() > 0;
+  }
+
+  @Override
+  public void fetch() {
+    for (Fetcher f : fetcherRunners) {
+      fetcherExecutor.submit(new FetchRunner(context, f));
+    }
+  }
+
+  @Override
+  public void kill() {
+    stopScriptExecutors();
+    context.setState(TaskAttemptState.TA_KILLED);
+    context.stop();
+  }
+
+  @Override
+  public void abort() {
+    stopScriptExecutors();
+    context.setState(TaskAttemptState.TA_FAILED);
+    context.stop();
+  }
+
+  @Override
+  public TaskStatusProto getReport() {
+    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+    builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
+    builder.setId(context.getTaskId().getProto())
+        .setProgress(context.getProgress())
+        .setState(context.getState());
+
+    builder.setInputStats(reloadInputStats());
+
+    if (context.getResultStats() != null) {
+      builder.setResultStats(context.getResultStats().getProto());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public boolean isProgressChanged() {
+    return context.isProgressChanged();
+  }
+
+  @Override
+  public void updateProgress() {
+    if(context != null && context.isStopped()){
+      return;
+    }
+
+    if (executor != null && context.getProgress() < 1.0f) {
+      context.setExecutorProgress(executor.getProgress());
+    }
+  }
+
+  private CatalogProtos.TableStatsProto reloadInputStats() {
+    synchronized(inputStats) {
+      if (this.executor == null) {
+        return inputStats.getProto();
+      }
+
+      TableStats executorInputStats = this.executor.getInputStats();
+
+      if (executorInputStats != null) {
+        inputStats.setValues(executorInputStats);
+      }
+      return inputStats.getProto();
+    }
+  }
+
+  private TaskCompletionReport getTaskCompletionReport() {
+    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
+    builder.setId(context.getTaskId().getProto());
+
+    builder.setInputStats(reloadInputStats());
+
+    if (context.hasResultStats()) {
+      builder.setResultStats(context.getResultStats().getProto());
+    } else {
+      builder.setResultStats(new TableStats().getProto());
+    }
+
+    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
+    if (it.hasNext()) {
+      do {
+        Entry<Integer, String> entry = it.next();
+        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+        part.setPartId(entry.getKey());
+
+        // Set output volume
+        if (context.getPartitionOutputVolume() != null) {
+          for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
+            if (entry.getKey().equals(e.getKey())) {
+              part.setVolume(e.getValue().longValue());
+              break;
+            }
+          }
+        }
+
+        builder.addShuffleFileOutputs(part.build());
+      } while (it.hasNext());
+    }
+
+    return builder.build();
+  }
+
+  private void waitForFetch() throws InterruptedException, IOException {
+    context.getFetchLatch().await();
+    LOG.info(context.getTaskId() + " All fetches are done!");
+    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
+
+    // Get all broadcasted tables
+    Set<String> broadcastTableNames = new HashSet<String>();
+    List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
+    if (broadcasts != null) {
+      for (EnforceProperty eachBroadcast : broadcasts) {
+        broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
+      }
+    }
+
+    // localize the fetched data and skip the broadcast table
+    for (String inputTable: inputs) {
+      if (broadcastTableNames.contains(inputTable)) {
+        continue;
+      }
+      File tableDir = new File(context.getFetchIn(), inputTable);
+      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+      context.updateAssignedFragments(inputTable, frags);
+    }
+  }
+
+  @Override
+  public void run() throws Exception {
+    startTime = System.currentTimeMillis();
+    Throwable error = null;
+
+    try {
+      if(!context.isStopped()) {
+        context.setState(TajoProtos.TaskAttemptState.TA_RUNNING);
+        if (context.hasFetchPhase()) {
+          // If the fetch is still in progress, the query unit must wait for complete.
+          waitForFetch();
+          context.setFetcherProgress(FETCHER_PROGRESS);
+          updateProgress();
+        }
+
+        this.executor = executionBlockContext.getTQueryEngine().createPlan(context, plan);
+        this.executor.init();
+
+        while(!context.isStopped() && executor.next() != null) {
+        }
+      }
+    } catch (Throwable e) {
+      error = e ;
+      LOG.error(e.getMessage(), e);
+      stopScriptExecutors();
+      context.stop();
+    } finally {
+      if (executor != null) {
+        try {
+          executor.close();
+          reloadInputStats();
+        } catch (IOException e) {
+          LOG.error(e, e);
+        }
+        this.executor = null;
+      }
+
+      executionBlockContext.completedTasksNum.incrementAndGet();
+      context.getHashShuffleAppenderManager().finalizeTask(getId());
+
+      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
+      if (context.isStopped()) {
+        context.setExecutorProgress(0.0f);
+
+        if (context.getState() == TaskAttemptState.TA_KILLED) {
+          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+          executionBlockContext.killedTasksNum.incrementAndGet();
+        } else {
+          context.setState(TaskAttemptState.TA_FAILED);
+          TaskFatalErrorReport.Builder errorBuilder =
+              TaskFatalErrorReport.newBuilder()
+                  .setId(getId().getProto());
+          if (error != null) {
+            if (error.getMessage() == null) {
+              errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
+            } else {
+              errorBuilder.setErrorMessage(error.getMessage());
+            }
+            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
+          }
+
+          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+          executionBlockContext.failedTasksNum.incrementAndGet();
+        }
+      } else {
+        // if successful
+        context.stop();
+        context.setProgress(1.0f);
+        context.setState(TaskAttemptState.TA_SUCCEEDED);
+        executionBlockContext.succeededTasksNum.incrementAndGet();
+
+        TaskCompletionReport report = getTaskCompletionReport();
+        queryMasterStub.done(null, report, NullCallback.get());
+      }
+      endTime = System.currentTimeMillis();
+      LOG.info(context.getTaskId() + " completed. " +
+          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    TaskHistory taskHistory = createTaskHistory();
+    executionBlockContext.addTaskHistory(getId().getTaskId(), taskHistory);
+    executionBlockContext.getTasks().remove(getId());
+
+    fetcherRunners.clear();
+    fetcherRunners = null;
+    try {
+      if(executor != null) {
+        executor.close();
+        executor = null;
+      }
+    } catch (IOException e) {
+      LOG.fatal(e.getMessage(), e);
+    }
+
+    executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
+    stopScriptExecutors();
+  }
+
+  public TaskHistory createTaskHistory() {
+    TaskHistory taskHistory = null;
+    try {
+      taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(),
+          startTime, endTime, reloadInputStats());
+
+      if (context.getOutputPath() != null) {
+        taskHistory.setOutputPath(context.getOutputPath().toString());
+      }
+
+      if (context.getWorkDir() != null) {
+        taskHistory.setWorkingPath(context.getWorkDir().toString());
+      }
+
+      if (context.getResultStats() != null) {
+        taskHistory.setOutputStats(context.getResultStats().getProto());
+      }
+
+      if (hasFetchPhase()) {
+        taskHistory.setTotalFetchCount(fetcherRunners.size());
+        int i = 0;
+        FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
+        for (Fetcher fetcher : fetcherRunners) {
+          // TODO store the fetcher histories
+          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
+            builder.setStartTime(fetcher.getStartTime());
+            builder.setFinishTime(fetcher.getFinishTime());
+            builder.setFileLength(fetcher.getFileLen());
+            builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
+            builder.setState(fetcher.getState());
+
+            taskHistory.addFetcherHistory(builder.build());
+          }
+          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
+        }
+        taskHistory.setFinishedFetchCount(i);
+      }
+    } catch (Exception e) {
+      LOG.warn(e.getMessage(), e);
+    }
+
+    return taskHistory;
+  }
+
+  public int hashCode() {
+    return context.hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof TaskImpl) {
+      TaskImpl other = (TaskImpl) obj;
+      return this.context.equals(other.context);
+    }
+    return false;
+  }
+
+  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+      throws IOException {
+    Configuration c = new Configuration(systemConf);
+    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
+    FileSystem fs = FileSystem.get(c);
+    Path tablePath = new Path(file.getAbsolutePath());
+
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus f : fileLists) {
+      if (f.getLen() == 0) {
+        continue;
+      }
+      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
+      listTablets.add(tablet);
+    }
+
+    // Special treatment for locally pseudo fetched chunks
+    synchronized (localChunks) {
+      for (FileChunk chunk : localChunks) {
+        if (name.equals(chunk.getEbId())) {
+          tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
+          listTablets.add(tablet);
+          LOG.info("One local chunk is added to listTablets");
+        }
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  private class FetchRunner implements Runnable {
+    private final TaskAttemptContext ctx;
+    private final Fetcher fetcher;
+    private int maxRetryNum;
+
+    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+      this.ctx = ctx;
+      this.fetcher = fetcher;
+      this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
+    }
+
+    @Override
+    public void run() {
+      int retryNum = 0;
+      int retryWaitTime = 1000; //sec
+
+      try { // for releasing fetch latch
+        while(!context.isStopped() && retryNum < maxRetryNum) {
+          if (retryNum > 0) {
+            try {
+              Thread.sleep(retryWaitTime);
+              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // max 10 seconds
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+          }
+          try {
+            FileChunk fetched = fetcher.get();
+            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
+          && fetched.getFile() != null) {
+              if (fetched.fromRemote() == false) {
+          localChunks.add(fetched);
+          LOG.info("Add a new FileChunk to local chunk list");
+              }
+              break;
+            }
+          } catch (Throwable e) {
+            LOG.error("Fetch failed: " + fetcher.getURI(), e);
+          }
+          retryNum++;
+        }
+      } finally {
+        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+          fetcherFinished(ctx);
+        } else {
+          if (retryNum == maxRetryNum) {
+            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
+          }
+          stopScriptExecutors();
+          context.stop(); // retry task
+          ctx.getFetchLatch().countDown();
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
+    if (totalFetcher > 0) {
+      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
+    } else {
+      return 0.0f;
+    }
+  }
+
+  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+    int fetcherSize = fetcherRunners.size();
+    if(fetcherSize == 0) {
+      return;
+    }
+
+    ctx.getFetchLatch().countDown();
+
+    int remainFetcher = (int) ctx.getFetchLatch().getCount();
+    if (remainFetcher == 0) {
+      context.setFetcherProgress(FETCHER_PROGRESS);
+    } else {
+      context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
+    }
+  }
+
+  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
+                                        List<FetchImpl> fetches) throws IOException {
+
+    if (fetches.size() > 0) {
+      Path inputDir = executionBlockContext.getLocalDirAllocator().
+          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+
+      int i = 0;
+      File storeDir;
+      File defaultStoreFile;
+      FileChunk storeChunk = null;
+      List<Fetcher> runnerList = Lists.newArrayList();
+
+      for (FetchImpl f : fetches) {
+        storeDir = new File(inputDir.toString(), f.getName());
+        if (!storeDir.exists()) {
+          if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir);
+        }
+
+        for (URI uri : f.getURIs()) {
+          defaultStoreFile = new File(storeDir, "in_" + i);
+          InetAddress address = InetAddress.getByName(uri.getHost());
+
+          WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
+          if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
+            boolean hasError = false;
+            try {
+              LOG.info("Try to get local file chunk at local host");
+              storeChunk = getLocalStoredFileChunk(uri, systemConf);
+            } catch (Throwable t) {
+              hasError = true;
+            }
+
+            // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+            // So, we should skip and don't need to create storeChunk.
+            if (storeChunk == null && !hasError) {
+              continue;
+            }
+
+            if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+                && hasError == false) {
+              storeChunk.setFromRemote(false);
+            } else {
+              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+              storeChunk.setFromRemote(true);
+            }
+          } else {
+            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+            storeChunk.setFromRemote(true);
+          }
+
+          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+          storeChunk.setEbId(f.getName());
+          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
+          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+          runnerList.add(fetcher);
+          i++;
+        }
+      }
+      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+      return runnerList;
+    } else {
+      return Lists.newArrayList();
+    }
+  }
+
+  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+    // Parse the URI
+    LOG.info("getLocalStoredFileChunk starts");
+    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
+    final List<String> types = params.get("type");
+    final List<String> qids = params.get("qid");
+    final List<String> taskIdList = params.get("ta");
+    final List<String> stageIds = params.get("sid");
+    final List<String> partIds = params.get("p");
+    final List<String> offsetList = params.get("offset");
+    final List<String> lengthList = params.get("length");
+
+    if (types == null || stageIds == null || qids == null || partIds == null) {
+      LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
+      return null;
+    }
+
+    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
+      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
+      return null;
+    }
+
+    String queryId = qids.get(0);
+    String shuffleType = types.get(0);
+    String sid = stageIds.get(0);
+    String partId = partIds.get(0);
+
+    if (shuffleType.equals("r") && taskIdList == null) {
+      LOG.error("Invalid URI - For range shuffle, taskId is required");
+      return null;
+    }
+    List<String> taskIds = splitMaps(taskIdList);
+
+    FileChunk chunk;
+    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+	+ ", taskIds=" + taskIdList);
+
+    // The working directory of Tajo worker for each query, including stage
+    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+    // If the stage requires a range shuffle
+    if (shuffleType.equals("r")) {
+      String ta = taskIds.get(0);
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+        LOG.warn("Range shuffle - file not exist");
+        return null;
+      }
+      Path path = executionBlockContext.getLocalFS().makeQualified(
+	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+      String startKey = params.get("start").get(0);
+      String endKey = params.get("end").get(0);
+      boolean last = params.get("final") != null;
+
+      try {
+        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
+            } catch (Throwable t) {
+        LOG.error("getFileChunks() throws exception");
+        return null;
+      }
+
+      // If the stage requires a hash shuffle or a scattered hash shuffle
+    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
+      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+        return null;
+      }
+      Path path = executionBlockContext.getLocalFS().makeQualified(
+        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+      File file = new File(path.toUri());
+      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+      if (startPos >= file.length()) {
+        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+        return null;
+      }
+      chunk = new FileChunk(file, startPos, readLen);
+
+    } else {
+      LOG.error("Unknown shuffle type");
+      return null;
+    }
+
+    return chunk;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<String>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  public static Path getTaskAttemptDir(TaskAttemptId quid) {
+    Path workDir =
+        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+            String.valueOf(quid.getTaskId().getId()),
+            String.valueOf(quid.getId()));
+    return workDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
new file mode 100644
index 0000000..7990a72
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.event.*;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A TaskManager is responsible for managing executionBlock resource and tasks.
+ * */
+public class TaskManager extends AbstractService implements EventHandler<TaskManagerEvent> {
+  private static final Log LOG = LogFactory.getLog(TaskManager.class);
+
+  private final TajoWorker.WorkerContext workerContext;
+  private final Map<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap;
+  private final Dispatcher dispatcher;
+  private final EventHandler rmEventHandler;
+
+  private TajoConf tajoConf;
+
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) {
+    super(TaskManager.class.getName());
+
+    this.dispatcher = dispatcher;
+    this.workerContext = workerContext;
+    this.executionBlockContextMap = Maps.newHashMap();
+    this.rmEventHandler = rmEventHandler;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
+
+    this.tajoConf = (TajoConf)conf;
+    dispatcher.register(TaskManagerEvent.EventType.class, this);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+
+    for(ExecutionBlockContext context: executionBlockContextMap.values()) {
+      context.stop();
+    }
+    executionBlockContextMap.clear();
+    super.serviceStop();
+  }
+
+  protected Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  protected TajoWorker.WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
+  protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+    try {
+      ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), null, request);
+
+      context.init();
+      return context;
+    } catch (Throwable e) {
+      LOG.fatal(e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected void stopExecutionBlock(ExecutionBlockContext context,
+                                    TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+
+    if(context != null){
+      try {
+        context.getSharedResource().releaseBroadcastCache(context.getExecutionBlockId());
+        context.sendShuffleReport();
+        getWorkerContext().getTaskHistoryWriter().flushTaskHistories();
+      } catch (Exception e) {
+        LOG.fatal(e.getMessage(), e);
+        throw new RuntimeException(e);
+      } finally {
+        context.stop();
+
+          /* cleanup intermediate files */
+        for (TajoIdProtos.ExecutionBlockIdProto ebId : cleanupList.getExecutionBlockIdList()) {
+          String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(ebId)).toString();
+          workerContext.cleanup(inputDir);
+          String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(ebId)).toString();
+          workerContext.cleanup(outputDir);
+        }
+      }
+      LOG.info("Stopped execution block:" + context.getExecutionBlockId());
+    }
+  }
+
+  @Override
+  public void handle(TaskManagerEvent event) {
+    LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType());
+
+    if (event instanceof ExecutionBlockStartEvent) {
+
+      //receive event from NodeResourceManager
+      if(!executionBlockContextMap.containsKey(event.getExecutionBlockId())) {
+        ExecutionBlockContext context = createExecutionBlock(((ExecutionBlockStartEvent) event).getRequestProto());
+        executionBlockContextMap.put(context.getExecutionBlockId(), context);
+      } else {
+        LOG.warn("Already initialized ExecutionBlock: " + event.getExecutionBlockId());
+      }
+    } else if (event instanceof ExecutionBlockStopEvent) {
+      //receive event from QueryMaster
+      rmEventHandler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+      stopExecutionBlock(executionBlockContextMap.remove(event.getExecutionBlockId()),
+          ((ExecutionBlockStopEvent) event).getCleanupList());
+    }
+  }
+
+  protected ExecutionBlockContext getExecutionBlockContext(ExecutionBlockId executionBlockId) {
+    return executionBlockContextMap.get(executionBlockId);
+  }
+
+  public Task getTaskByTaskAttemptId(TaskAttemptId taskAttemptId) {
+    ExecutionBlockContext context = executionBlockContextMap.get(taskAttemptId.getTaskId().getExecutionBlockId());
+    if (context != null) {
+      return context.getTask(taskAttemptId);
+    }
+    return null;
+  }
+
+  public List<TaskHistory> getTaskHistories(ExecutionBlockId executionblockId) throws IOException {
+    List<TaskHistory> histories = new ArrayList<TaskHistory>();
+    ExecutionBlockContext context = executionBlockContextMap.get(executionblockId);
+    if (context != null) {
+      histories.addAll(context.getTaskHistories().values());
+    }
+    //TODO get List<TaskHistory> from HistoryReader
+    return histories;
+  }
+
+  public TaskHistory getTaskHistory(TaskId taskId) {
+    TaskHistory history = null;
+    ExecutionBlockContext context = executionBlockContextMap.get(taskId.getExecutionBlockId());
+    if (context != null) {
+      history = context.getTaskHistories().get(taskId);
+    }
+    //TODO get TaskHistory from HistoryReader
+    return history;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 774f358..207b47e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -34,10 +34,8 @@ import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.container.TajoContainerIdPBImpl;
 import org.apache.tajo.master.container.TajoConverterUtils;
 import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 
-import java.net.ConnectException;
 import java.util.concurrent.*;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -45,6 +43,7 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 /**
  * The driver class for Tajo Task processing.
  */
+@Deprecated
 public class TaskRunner extends AbstractService {
   /** class logger */
   private static final Log LOG = LogFactory.getLog(TaskRunner.class);
@@ -256,7 +255,7 @@ public class TaskRunner extends AbstractService {
                   LOG.info("Initializing: " + taskAttemptId);
                   Task task = null;
                   try {
-                    task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
+                    task = new LegacyTaskImpl(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
                         new TaskRequestImpl(taskRequest));
                     getContext().getTasks().put(taskAttemptId, task);
 
@@ -269,10 +268,11 @@ public class TaskRunner extends AbstractService {
                   } catch (Throwable t) {
                     LOG.error(t.getMessage(), t);
                     fatalError(qmClientService, taskAttemptId, t.getMessage());
+                  } finally {
                     if(task != null) {
-                      task.cleanupTask();
+                      task.cleanup();
                     }
-                  } finally {
+
                     callFuture = null;
                     taskRequest = null;
                   }