You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/15 10:44:01 UTC

[1/2] TAJO-1015: Add executionblock event in worker. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 64416dea6 -> 15450e868


http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 8009ce3..5eb66b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -18,38 +18,45 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.engine.utils.TupleCache;
+import org.apache.tajo.worker.event.TaskRunnerEvent;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.apache.tajo.worker.event.TaskRunnerStopEvent;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-public class TaskRunnerManager extends CompositeService {
+public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> {
   private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
 
-  private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
-  private final Map<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<String, TaskRunner> taskRunnerMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap();
   private TajoWorker.WorkerContext workerContext;
   private TajoConf tajoConf;
   private AtomicBoolean stop = new AtomicBoolean(false);
   private FinishedTaskCleanThread finishedTaskCleanThread;
+  private Dispatcher dispatcher;
 
-  public TaskRunnerManager(TajoWorker.WorkerContext workerContext) {
+  public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) {
     super(TaskRunnerManager.class.getName());
 
     this.workerContext = workerContext;
+    this.dispatcher = dispatcher;
   }
 
   public TajoWorker.WorkerContext getWorkerContext() {
@@ -58,7 +65,9 @@ public class TaskRunnerManager extends CompositeService {
 
   @Override
   public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
     tajoConf = (TajoConf)conf;
+    dispatcher.register(TaskRunnerEvent.EventType.class, this);
     super.init(tajoConf);
   }
 
@@ -82,6 +91,9 @@ public class TaskRunnerManager extends CompositeService {
         }
       }
     }
+    for(ExecutionBlockContext context: executionBlockContextMap.values()) {
+      context.stop();
+    }
 
     if(finishedTaskCleanThread != null) {
       finishedTaskCleanThread.interrupted();
@@ -92,148 +104,35 @@ public class TaskRunnerManager extends CompositeService {
     }
   }
 
-  public void stopTask(String id) {
+  public void stopTaskRunner(String id) {
     LOG.info("Stop Task:" + id);
-    synchronized(taskRunnerMap) {
-      TaskRunner taskRunner = taskRunnerMap.remove(id);
-      if (taskRunner != null) {
-        synchronized(taskRunnerCompleteCounter) {
-          ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId();
-          AtomicInteger ebSuccessedTaskNums = successedTaskNums.get(ebId);
-          if (ebSuccessedTaskNums == null) {
-            ebSuccessedTaskNums = new AtomicInteger(taskRunner.getContext().succeededTasksNum.get());
-            successedTaskNums.put(ebId, ebSuccessedTaskNums);
-          } else {
-            ebSuccessedTaskNums.addAndGet(taskRunner.getContext().succeededTasksNum.get());
-          }
-
-          Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId);
-
-          if (counter != null) {
-            if (counter.getSecond().decrementAndGet() <= 0) {
-              LOG.info(ebId + "'s all tasks are completed.");
-              try {
-                closeExecutionBlock(ebId, ebSuccessedTaskNums.get(), taskRunner);
-              } catch (Exception e) {
-                LOG.error(ebId + ", closing error:" + e.getMessage(), e);
-              }
-              successedTaskNums.remove(ebId);
-              taskRunnerCompleteCounter.remove(ebId);
-            }
-          }
-        }
-      }
-    }
+    TaskRunner taskRunner = taskRunnerMap.remove(id);
+    taskRunner.stop();
     if(workerContext.isYarnContainerMode()) {
       stop();
     }
   }
 
-  private void closeExecutionBlock(ExecutionBlockId ebId, int succeededTasks, TaskRunner lastTaskRunner) throws Exception {
-    TajoWorkerProtocol.ExecutionBlockReport.Builder reporterBuilder =
-        TajoWorkerProtocol.ExecutionBlockReport.newBuilder();
-    reporterBuilder.setEbId(ebId.getProto());
-    reporterBuilder.setReportSuccess(true);
-    reporterBuilder.setSucceededTasks(succeededTasks);
-    try {
-      List<TajoWorkerProtocol.IntermediateEntryProto> intermediateEntries =
-          new ArrayList<TajoWorkerProtocol.IntermediateEntryProto>();
-      List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles =
-          workerContext.getHashShuffleAppenderManager().close(ebId);
-      if (shuffles == null) {
-        reporterBuilder.addAllIntermediateEntries(intermediateEntries);
-        lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
-        return;
-      }
-
-      TajoWorkerProtocol.IntermediateEntryProto.Builder intermediateBuilder =
-          TajoWorkerProtocol.IntermediateEntryProto.newBuilder();
-      TajoWorkerProtocol.IntermediateEntryProto.PageProto.Builder pageBuilder =
-          TajoWorkerProtocol.IntermediateEntryProto.PageProto.newBuilder();
-      TajoWorkerProtocol.FailureIntermediateProto.Builder failureBuilder =
-          TajoWorkerProtocol.FailureIntermediateProto.newBuilder();
-
-      for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
-        List<TajoWorkerProtocol.IntermediateEntryProto.PageProto> pages =
-            new ArrayList<TajoWorkerProtocol.IntermediateEntryProto.PageProto>();
-        List<TajoWorkerProtocol.FailureIntermediateProto> failureIntermediateItems =
-            new ArrayList<TajoWorkerProtocol.FailureIntermediateProto>();
-
-        for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
-          pageBuilder.clear();
-          pageBuilder.setPos(eachPage.getFirst());
-          pageBuilder.setLength(eachPage.getSecond());
-          pages.add(pageBuilder.build());
-        }
-
-        for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
-          failureBuilder.clear();
-          failureBuilder.setPagePos(eachFailure.getFirst());
-          failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
-          failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
-          failureIntermediateItems.add(failureBuilder.build());
-        }
-
-        intermediateBuilder.clear();
-
-        intermediateBuilder.setEbId(ebId.getProto())
-            .setHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
-                workerContext.getPullServerPort())
-            .setTaskId(-1)
-            .setAttemptId(-1)
-            .setPartId(eachShuffle.getPartId())
-            .setVolume(eachShuffle.getVolume())
-            .addAllPages(pages)
-            .addAllFailures(failureIntermediateItems);
-
-        intermediateEntries.add(intermediateBuilder.build());
-      }
-
-      // send intermediateEntries to QueryMaster
-      reporterBuilder.addAllIntermediateEntries(intermediateEntries);
-
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      reporterBuilder.setReportSuccess(false);
-      if (e.getMessage() == null) {
-        reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName());
-      } else {
-        reporterBuilder.setReportErrorMessage(e.getMessage());
-      }
-    }
-    lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
-  }
-
   public Collection<TaskRunner> getTaskRunners() {
-    synchronized(taskRunnerMap) {
-      return Collections.unmodifiableCollection(taskRunnerMap.values());
-    }
+    return Collections.unmodifiableCollection(taskRunnerMap.values());
   }
 
   public Collection<TaskRunnerHistory> getExecutionBlockHistories() {
-    synchronized(taskRunnerHistoryMap) {
-      return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
-    }
+    return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
   }
 
   public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String taskRunnerId) {
-    synchronized(taskRunnerHistoryMap) {
-      return taskRunnerHistoryMap.get(taskRunnerId);
-    }
+    return taskRunnerHistoryMap.get(taskRunnerId);
   }
 
   public TaskRunner getTaskRunner(String taskRunnerId) {
-    synchronized(taskRunnerMap) {
-      return taskRunnerMap.get(taskRunnerId);
-    }
+    return taskRunnerMap.get(taskRunnerId);
   }
 
-  public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
-    synchronized(taskRunnerMap) {
-      for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
-        Task task = eachTaskRunner.getContext().getTask(quAttemptId);
-        if (task != null) return task;
-      }
+  public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId queryUnitAttemptId) {
+    ExecutionBlockContext context = executionBlockContextMap.get(queryUnitAttemptId.getQueryUnitId().getExecutionBlockId());
+    if (context != null) {
+      return context.getTask(queryUnitAttemptId);
     }
     return null;
   }
@@ -250,53 +149,61 @@ public class TaskRunnerManager extends CompositeService {
   }
 
   public int getNumTasks() {
-    synchronized(taskRunnerMap) {
-      return taskRunnerMap.size();
-    }
+    return taskRunnerMap.size();
   }
 
-  //<# tasks, # running tasks>
-  Map<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>> taskRunnerCompleteCounter =
-      new HashMap<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>>();
-
-  Map<ExecutionBlockId, AtomicInteger> successedTaskNums = new HashMap<ExecutionBlockId, AtomicInteger>();
-
-  public void startTask(final String[] params) {
-    //TODO change to use event dispatcher
-    Thread t = new Thread() {
-      public void run() {
+  @Override
+  public void handle(TaskRunnerEvent event) {
+    LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType());
+    if (event instanceof TaskRunnerStartEvent) {
+      TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event;
+      ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId());
+      String[] params = startEvent.getParams();
+      if(context == null){
         try {
-          TajoConf systemConf = new TajoConf(tajoConf);
-          TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, systemConf, params);
-          LOG.info("Start TaskRunner:" + taskRunner.getId());
-          synchronized(taskRunnerMap) {
-            taskRunnerMap.put(taskRunner.getId(), taskRunner);
-          }
+          // QueryMaster's address
+          String host = params[4];
+          int port = Integer.parseInt(params[5]);
+
+          context = new ExecutionBlockContext(this, startEvent, new InetSocketAddress(host, port));
+        } catch (Throwable e) {
+          LOG.fatal(e.getMessage(), e);
+          throw new RuntimeException(e);
+        }
+        executionBlockContextMap.put(event.getExecutionBlockId(), context);
+      }
 
-          synchronized (taskRunnerHistoryMap){
-            taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getContext().getExcutionBlockHistory());
-          }
+      TaskRunner taskRunner = new TaskRunner(context, params);
+      LOG.info("Start TaskRunner:" + taskRunner.getId());
+      taskRunnerMap.put(taskRunner.getId(), taskRunner);
+      taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());
 
-          synchronized(taskRunnerCompleteCounter) {
-            ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId();
-            Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId);
-            if (counter == null) {
-              counter = new Pair(new AtomicInteger(0), new AtomicInteger(0));
-              taskRunnerCompleteCounter.put(ebId, counter);
-            }
-            counter.getFirst().incrementAndGet();
-            counter.getSecond().incrementAndGet();
-          }
-          taskRunner.init(systemConf);
-          taskRunner.start();
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-          throw new RuntimeException(e.getMessage(), e);
+      taskRunner.init(context.getConf());
+      taskRunner.start();
+
+    } else if (event instanceof TaskRunnerStopEvent) {
+      ExecutionBlockContext executionBlockContext =  executionBlockContextMap.remove(event.getExecutionBlockId());
+      if(executionBlockContext != null){
+        TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId());
+        executionBlockContext.reportExecutionBlock(event.getExecutionBlockId());
+        executionBlockContext.stop();
+        try {
+          workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId());
+        } catch (IOException e) {
+          LOG.fatal(e.getMessage(), e);
+          throw new RuntimeException(e);
         }
       }
-    };
+      LOG.info("Stopped execution block:" + event.getExecutionBlockId());
+    }
+  }
+
+  public EventHandler getEventHandler(){
+    return dispatcher.getEventHandler();
+  }
 
-    t.start();
+  public TajoConf getTajoConf() {
+    return tajoConf;
   }
 
   class FinishedTaskCleanThread extends Thread {

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
new file mode 100644
index 0000000..aac8973
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+
+public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> {
+  public enum EventType {
+    START,
+    STOP
+  }
+
+  protected final ExecutionBlockId executionBlockId;
+
+  public TaskRunnerEvent(EventType eventType,
+                         ExecutionBlockId executionBlockId) {
+    super(eventType);
+    this.executionBlockId = executionBlockId;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
new file mode 100644
index 0000000..8c9fa51
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.engine.query.QueryContext;
+
+public class TaskRunnerStartEvent extends TaskRunnerEvent {
+
+  private final QueryContext queryContext;
+  private final String[] params;
+  private final String plan;
+
+  public TaskRunnerStartEvent(String[] params,
+                              ExecutionBlockId executionBlockId,
+                              QueryContext context,
+                              String plan) {
+    super(EventType.START, executionBlockId);
+    this.params = params;
+    this.queryContext = context;
+    this.plan = plan;
+  }
+
+  public String[] getParams(){
+    return this.params;
+  }
+
+  public QueryContext getQueryContext() {
+    return queryContext;
+  }
+
+  public String getPlan() {
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
new file mode 100644
index 0000000..c8ec20d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+
+public class TaskRunnerStopEvent extends TaskRunnerEvent {
+
+  public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) {
+    super(EventType.STOP, executionBlockId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index 0119a88..06d2a42 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -32,7 +32,7 @@ service QueryMasterProtocolService {
   //from Worker
   rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
   rpc statusUpdate (TaskStatusProto) returns (BoolProto);
-  rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+  rpc ping (ExecutionBlockIdProto) returns (BoolProto);
   rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
   rpc done (TaskCompletionReport) returns (BoolProto);
   rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index e100c48..dff2733 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -201,7 +201,7 @@ message DataChannelProto {
 }
 
 message RunExecutionBlockRequestProto {
-    required string executionBlockId = 1;
+    required ExecutionBlockIdProto executionBlockId = 1;
     required string queryMasterHost = 2;
     required int32 queryMasterPort = 3;
     required string nodeId = 4;
@@ -220,7 +220,8 @@ service TajoWorkerProtocolService {
   rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
 
   // from QueryMaster(Worker)
-  rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+  rpc startExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+  rpc stopExecutionBlock(ExecutionBlockIdProto) returns (BoolProto);
   rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
   rpc cleanup(QueryIdProto) returns (BoolProto);
   rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 95c06bb..b755e02 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -106,6 +106,8 @@ public class TestFetcher {
 
   @Test
   public void testAdjustFetchProcess() {
+    assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0);
     assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0);
     assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0);
     assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0);


[2/2] git commit: TAJO-1015: Add executionblock event in worker. (jinho)

Posted by jh...@apache.org.
TAJO-1015: Add executionblock event in worker. (jinho)

Closes #124


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/15450e86
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/15450e86
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/15450e86

Branch: refs/heads/master
Commit: 15450e868ae6a985deb988e679d19e1364c95526
Parents: 64416de
Author: jhkim <jh...@apache.org>
Authored: Mon Sep 15 17:42:38 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Sep 15 17:42:38 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +
 .../apache/tajo/master/TajoContainerProxy.java  |   4 +-
 .../querymaster/QueryMasterManagerService.java  |   5 +-
 .../tajo/master/querymaster/SubQuery.java       |   2 +-
 .../tajo/worker/ExecutionBlockContext.java      | 437 +++++++++++++++++++
 .../worker/ExecutionBlockSharedResource.java    |  36 +-
 .../tajo/worker/TajoResourceAllocator.java      |  41 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  82 ++--
 .../tajo/worker/TajoWorkerManagerService.java   |  46 +-
 .../main/java/org/apache/tajo/worker/Task.java  | 285 ++++--------
 .../apache/tajo/worker/TaskAttemptContext.java  |  47 +-
 .../java/org/apache/tajo/worker/TaskRunner.java | 277 ++----------
 .../apache/tajo/worker/TaskRunnerManager.java   | 253 ++++-------
 .../tajo/worker/event/TaskRunnerEvent.java      |  41 ++
 .../tajo/worker/event/TaskRunnerStartEvent.java |  51 +++
 .../tajo/worker/event/TaskRunnerStopEvent.java  |  28 ++
 .../src/main/proto/QueryMasterProtocol.proto    |   2 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   5 +-
 .../org/apache/tajo/worker/TestFetcher.java     |   2 +
 20 files changed, 924 insertions(+), 725 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6def7f8..61fcec0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -431,6 +431,8 @@ Release 0.9.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1015: Add executionblock event in worker. (jinho)
+
     TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)
 
 Release 0.8.0

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9aead24..a089b54 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -262,6 +262,9 @@ public class TajoConf extends Configuration {
     // Client RPC
     RPC_CLIENT_WORKER_THREAD_NUM("tajo.rpc.client.worker-thread-num", 4),
 
+    SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM("tajo.shuffle.rpc.client.worker-thread-num",
+        Runtime.getRuntime().availableProcessors()),
+
     //Client service RPC Server
     MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.service.rpc.server.worker-thread-num",
         Runtime.getRuntime().availableProcessors() * 1),

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index f3e4b72..c317ba5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -101,7 +101,7 @@ public class TajoContainerProxy extends ContainerProxy {
 
       TajoWorkerProtocol.RunExecutionBlockRequestProto request =
           TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
-              .setExecutionBlockId(executionBlockId.toString())
+              .setExecutionBlockId(executionBlockId.getProto())
               .setQueryMasterHost(myAddr.getHostName())
               .setQueryMasterPort(myAddr.getPort())
               .setNodeId(container.getNodeId().toString())
@@ -111,7 +111,7 @@ public class TajoContainerProxy extends ContainerProxy {
               .setPlanJson(planJson)
               .build();
 
-      tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
+      tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get());
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 4f3c2ab..862dfef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -40,8 +40,6 @@ import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 public class QueryMasterManagerService extends CompositeService
     implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
@@ -177,7 +175,7 @@ public class QueryMasterManagerService extends CompositeService
 
   @Override
   public void ping(RpcController controller,
-                   TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+                   TajoIdProtos.ExecutionBlockIdProto requestProto,
                    RpcCallback<PrimitiveProtos.BoolProto> done) {
     done.run(TajoWorker.TRUE_PROTO);
   }
@@ -225,6 +223,7 @@ public class QueryMasterManagerService extends CompositeService
       ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
       queryMasterTask.getQuery().getSubQuery(ebId).receiveExecutionBlockReport(request);
     }
+    done.run(TajoWorker.TRUE_PROTO);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 850ffe8..92ef9f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -348,7 +348,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
 
     if (totalProgress > 0.0f) {
-      return (float) Math.floor((totalProgress / (float) tempTasks.size()) * 1000.0f) / 1000.0f;
+      return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f;
     } else {
       return 0.0f;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
new file mode 100644
index 0000000..306ab66
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class ExecutionBlockContext {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
+
+  private TaskRunnerManager manager;
+  public AtomicInteger completedTasksNum = new AtomicInteger();
+  public AtomicInteger succeededTasksNum = new AtomicInteger();
+  public AtomicInteger killedTasksNum = new AtomicInteger();
+  public AtomicInteger failedTasksNum = new AtomicInteger();
+
+  private ClientSocketChannelFactory channelFactory;
+  // for temporal or intermediate files
+  private FileSystem localFS;
+  // for input files
+  private FileSystem defaultFS;
+  private ExecutionBlockId executionBlockId;
+  private QueryContext queryContext;
+  private String plan;
+
+  private ExecutionBlockSharedResource resource;
+
+  private TajoQueryEngine queryEngine;
+  private RpcConnectionPool connPool;
+  private InetSocketAddress qmMasterAddr;
+  private TajoConf systemConf;
+  // for the doAs block
+  private UserGroupInformation taskOwner;
+
+  private Reporter reporter;
+
+  private AtomicBoolean stop = new AtomicBoolean();
+
+  // It keeps all of the query unit attempts while a TaskRunner is running.
+  private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap();
+
+  private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
+
+  public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
+      throws Throwable {
+    this.manager = manager;
+    this.executionBlockId = event.getExecutionBlockId();
+    this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
+    this.qmMasterAddr = queryMaster;
+    this.systemConf = manager.getTajoConf();
+    this.reporter = new Reporter();
+    this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
+    this.localFS = FileSystem.getLocal(systemConf);
+
+    // Setup QueryEngine according to the query plan
+    // Here, we can setup row-based query engine or columnar query engine.
+    this.queryEngine = new TajoQueryEngine(systemConf);
+    this.queryContext = event.getQueryContext();
+    this.plan = event.getPlan();
+    this.resource = new ExecutionBlockSharedResource();
+
+    init();
+  }
+
+  public void init() throws Throwable {
+
+    LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
+    LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
+
+    LOG.info("QueryMaster Address:" + qmMasterAddr);
+
+    UserGroupInformation.setConfiguration(systemConf);
+    // TODO - 'load credential' should be implemented
+    // Getting taskOwner
+    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
+
+    // initialize DFS and LocalFileSystems
+    this.taskOwner = taskOwner;
+    this.reporter.startReporter();
+
+    // resource intiailization
+    try{
+      this.resource.initialize(queryContext, plan);
+    } catch (Throwable e) {
+      getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+      throw e;
+    }
+  }
+
+  public ExecutionBlockSharedResource getSharedResource() {
+    return resource;
+  }
+
+  public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() throws Exception {
+    NettyClientBase clientBase = null;
+    try {
+      clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+      return clientBase.getStub();
+    } finally {
+      connPool.releaseConnection(clientBase);
+    }
+  }
+
+  public void stop(){
+    if(stop.getAndSet(true)){
+      return;
+    }
+
+    try {
+      reporter.stop();
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    }
+
+    // If ExecutionBlock is stopped, all running or pending tasks will be marked as failed.
+    for (Task task : tasks.values()) {
+      if (task.getStatus() == TajoProtos.TaskAttemptState.TA_PENDING ||
+          task.getStatus() == TajoProtos.TaskAttemptState.TA_RUNNING) {
+        task.setState(TajoProtos.TaskAttemptState.TA_FAILED);
+        try{
+          task.abort();
+        } catch (Throwable e){
+          LOG.error(e);
+        }
+      }
+    }
+    tasks.clear();
+
+    resource.release();
+
+    try {
+      releaseShuffleChannelFactory();
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public TajoConf getConf() {
+    return manager.getTajoConf();
+  }
+
+  public FileSystem getLocalFS() {
+    return localFS;
+  }
+
+  public FileSystem getDefaultFS() {
+    return defaultFS;
+  }
+
+  public LocalDirAllocator getLocalDirAllocator() {
+    return manager.getWorkerContext().getLocalDirAllocator();
+  }
+
+  public TajoQueryEngine getTQueryEngine() {
+    return queryEngine;
+  }
+
+  // for the local temporal dir
+  public Path createBaseDir() throws IOException {
+    // the base dir for an output dir
+    String baseDir = getBaseOutputDir(executionBlockId).toString();
+    Path baseDirPath = localFS.makeQualified(getLocalDirAllocator().getLocalPathForWrite(baseDir, systemConf));
+    return baseDirPath;
+  }
+
+  public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
+    Path workDir =
+        StorageUtil.concatPath(
+            executionBlockId.getQueryId().toString(),
+            "output",
+            String.valueOf(executionBlockId.getId()));
+    return workDir;
+  }
+
+  public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
+    Path workDir =
+        StorageUtil.concatPath(
+            executionBlockId.getQueryId().toString(),
+            "in",
+            executionBlockId.toString());
+    return workDir;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public Map<QueryUnitAttemptId, Task> getTasks() {
+    return tasks;
+  }
+
+  public Task getTask(QueryUnitAttemptId queryUnitAttemptId){
+    return tasks.get(queryUnitAttemptId);
+  }
+
+  public void stopTaskRunner(String id){
+    manager.stopTaskRunner(id);
+  }
+
+  public TaskRunner getTaskRunner(String taskRunnerId){
+    return manager.getTaskRunner(taskRunnerId);
+  }
+
+  public void addTaskHistory(String taskRunnerId, QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
+    histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory);
+  }
+
+  public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){
+    histories.putIfAbsent(runner.getId(), new TaskRunnerHistory(runner.getContainerId(), executionBlockId));
+    return histories.get(runner.getId());
+  }
+
+  public TajoWorker.WorkerContext getWorkerContext(){
+    return manager.getWorkerContext();
+  }
+
+  protected ClientSocketChannelFactory getShuffleChannelFactory(){
+    if(channelFactory == null) {
+      int workerNum = getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM);
+      channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
+    }
+    return channelFactory;
+  }
+
+  protected void releaseShuffleChannelFactory(){
+    if(channelFactory != null) {
+      channelFactory.shutdown();
+      channelFactory.releaseExternalResources();
+      channelFactory = null;
+    }
+  }
+
+  private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
+    getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
+  }
+
+  protected void reportExecutionBlock(ExecutionBlockId ebId) {
+    ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
+    reporterBuilder.setEbId(ebId.getProto());
+    reporterBuilder.setReportSuccess(true);
+    reporterBuilder.setSucceededTasks(succeededTasksNum.get());
+    try {
+      List<IntermediateEntryProto> intermediateEntries = Lists.newArrayList();
+      List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles =
+          getWorkerContext().getHashShuffleAppenderManager().close(ebId);
+      if (shuffles == null) {
+        reporterBuilder.addAllIntermediateEntries(intermediateEntries);
+        sendExecutionBlockReport(reporterBuilder.build());
+        return;
+      }
+
+      IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder();
+      IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder();
+      FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder();
+
+      for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
+        List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList();
+        List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList();
+
+        for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
+          pageBuilder.clear();
+          pageBuilder.setPos(eachPage.getFirst());
+          pageBuilder.setLength(eachPage.getSecond());
+          pages.add(pageBuilder.build());
+        }
+
+        for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
+          failureBuilder.clear();
+          failureBuilder.setPagePos(eachFailure.getFirst());
+          failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
+          failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
+          failureIntermediateItems.add(failureBuilder.build());
+        }
+        intermediateBuilder.clear();
+
+        intermediateBuilder.setEbId(ebId.getProto())
+            .setHost(getWorkerContext().getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
+                getWorkerContext().getPullServerPort())
+            .setTaskId(-1)
+            .setAttemptId(-1)
+            .setPartId(eachShuffle.getPartId())
+            .setVolume(eachShuffle.getVolume())
+            .addAllPages(pages)
+            .addAllFailures(failureIntermediateItems);
+        intermediateEntries.add(intermediateBuilder.build());
+      }
+
+      // send intermediateEntries to QueryMaster
+      reporterBuilder.addAllIntermediateEntries(intermediateEntries);
+
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      reporterBuilder.setReportSuccess(false);
+      if (e.getMessage() == null) {
+        reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName());
+      } else {
+        reporterBuilder.setReportErrorMessage(e.getMessage());
+      }
+    }
+    try {
+      sendExecutionBlockReport(reporterBuilder.build());
+    } catch (Throwable e) {
+      // can't send report to query master
+      LOG.fatal(e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected class Reporter {
+    private Thread reporterThread;
+    private AtomicBoolean reporterStop = new AtomicBoolean();
+    private static final int PROGRESS_INTERVAL = 1000;
+    private static final int MAX_RETRIES = 10;
+
+    public Reporter() {
+      this.reporterThread = new Thread(createReporterThread());
+      this.reporterThread.setName("Task reporter");
+    }
+
+    public void startReporter(){
+      this.reporterThread.start();
+    }
+
+    Runnable createReporterThread() {
+
+      return new Runnable() {
+        int remainingRetries = MAX_RETRIES;
+        QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub;
+        @Override
+        public void run() {
+          while (!reporterStop.get() && !Thread.interrupted()) {
+            try {
+              masterStub = getQueryMasterStub();
+
+              if(tasks.size() == 0){
+                masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());
+              } else {
+                for (Task task : new ArrayList<Task>(tasks.values())){
+
+                  if (task.isRunning() && task.isProgressChanged()) {
+                    task.updateProgress();
+                    masterStub.statusUpdate(null, task.getReport(), NullCallback.get());
+                    task.getContext().setProgressChanged(false);
+                  } else {
+                    task.updateProgress();
+                  }
+                }
+              }
+            } catch (Throwable t) {
+              LOG.error(t.getMessage(), t);
+              remainingRetries -=1;
+              if (remainingRetries == 0) {
+                ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+                LOG.warn("Last retry, exiting ");
+                throw new RuntimeException(t);
+              }
+            } finally {
+              if (remainingRetries > 0 && !reporterStop.get()) {
+                synchronized (reporterThread) {
+                  try {
+                    reporterThread.wait(PROGRESS_INTERVAL);
+                  } catch (InterruptedException e) {
+                  }
+                }
+              }
+            }
+          }
+        }
+      };
+    }
+
+    public void stop() throws InterruptedException {
+      if (reporterStop.getAndSet(true)) {
+        return;
+      }
+
+      if (reporterThread != null) {
+        // Intent of the lock is to not send an interupt in the middle of an
+        // umbilical.ping or umbilical.statusUpdate
+        synchronized (reporterThread) {
+          //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+          reporterThread.notifyAll();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index a70fbfd..e77e265 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -32,14 +32,12 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.util.Pair;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class ExecutionBlockSharedResource {
   private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class);
   private AtomicBoolean initializing = new AtomicBoolean(false);
   private volatile Boolean resourceInitSuccess = new Boolean(false);
-  private CountDownLatch initializedResourceLatch = new CountDownLatch(1);
 
   // Query
   private QueryContext context;
@@ -50,27 +48,18 @@ public class ExecutionBlockSharedResource {
   private LogicalNode plan;
   private boolean codeGenEnabled = false;
 
-  public void initialize(final QueryContext context, final String planJson) throws InterruptedException {
+  public void initialize(final QueryContext context, final String planJson) {
 
     if (!initializing.getAndSet(true)) {
-      Thread thread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            ExecutionBlockSharedResource.this.context = context;
-            initPlan(planJson);
-            initCodeGeneration();
-            resourceInitSuccess = true;
-          } catch (Throwable t) {
-            LOG.error(t);
-            LOG.error(ExceptionUtils.getStackTrace(t));
-          } finally {
-            initializedResourceLatch.countDown();
-          }
-        }
-      });
-      thread.run();
-      thread.join();
+      try {
+        ExecutionBlockSharedResource.this.context = context;
+        initPlan(planJson);
+        initCodeGeneration();
+        resourceInitSuccess = true;
+      } catch (Throwable t) {
+        LOG.error(t);
+        LOG.error(ExceptionUtils.getStackTrace(t));
+      }
 
       if (!resourceInitSuccess) {
         throw new RuntimeException("Resource cannot be initialized");
@@ -91,11 +80,6 @@ public class ExecutionBlockSharedResource {
     }
   }
 
-  public boolean awaitInitializedResource() throws InterruptedException {
-    initializedResourceLatch.await();
-    return resourceInitSuccess;
-  }
-
   public LogicalNode getPlan() {
     return this.plan;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index aaff69c..2cc8f0c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
@@ -44,14 +46,13 @@ import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
 import org.apache.tajo.util.HAServiceUtil;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.net.InetSocketAddress;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -144,6 +145,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         launchTaskRunners(launchEvent);
       } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
         stopContainers(event.getContainers());
+        stopExecutionBlock(event.getExecutionBlockId(), event.getContainers());
       }
     }
   }
@@ -158,6 +160,37 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     }
   }
 
+  public void stopExecutionBlock(final ExecutionBlockId executionBlockId, Collection<Container> containers) {
+    Set<NodeId> workers = Sets.newHashSet();
+    for (Container container : containers){
+      workers.add(container.getNodeId());
+    }
+
+    for (final NodeId worker : workers) {
+      executorService.submit(new Runnable() {
+        @Override
+        public void run() {
+          stopExecutionBlock(executionBlockId, worker);
+        }
+      });
+    }
+  }
+
+  private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker) {
+    NettyClientBase tajoWorkerRpc = null;
+    try {
+      InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort());
+      tajoWorkerRpc = RpcConnectionPool.getPool(tajoConf).getConnection(addr, TajoWorkerProtocol.class, true);
+      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+
+      tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get());
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      RpcConnectionPool.getPool(tajoConf).releaseConnection(tajoWorkerRpc);
+    }
+  }
+
   protected static class LaunchRunner implements Runnable {
     private final ContainerProxy proxy;
     private final ContainerId id;

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 584c60e..a8d661b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -27,15 +27,12 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.shell.PathData;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.ha.TajoMasterInfo;
 import org.apache.tajo.master.querymaster.QueryMaster;
@@ -45,8 +42,11 @@ import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.*;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.StaticHttpServer;
 
@@ -57,7 +57,6 @@ import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -125,6 +124,10 @@ public class TajoWorker extends CompositeService {
 
   private HashShuffleAppenderManager hashShuffleAppenderManager;
 
+  private AsyncDispatcher dispatcher;
+
+  private LocalDirAllocator lDirAllocator;
+
   public TajoWorker() throws Exception {
     super(TajoWorker.class.getName());
   }
@@ -166,7 +169,7 @@ public class TajoWorker extends CompositeService {
   }
   
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
 
     this.systemConf = (TajoConf)conf;
@@ -174,6 +177,7 @@ public class TajoWorker extends CompositeService {
 
     this.connPool = RpcConnectionPool.getPool(systemConf);
     this.workerContext = new WorkerContext();
+    this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
     String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
 
@@ -192,6 +196,9 @@ public class TajoWorker extends CompositeService {
       systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
     }
 
+    this.dispatcher = new AsyncDispatcher();
+    addIfService(dispatcher);
+
     // querymaster worker
     tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
     addService(tajoWorkerClientService);
@@ -200,7 +207,7 @@ public class TajoWorker extends CompositeService {
     addService(queryMasterManagerService);
 
     // taskrunner worker
-    taskRunnerManager = new TaskRunnerManager(workerContext);
+    taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher);
     addService(taskRunnerManager);
 
     tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
@@ -240,30 +247,19 @@ public class TajoWorker extends CompositeService {
         ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
         ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
 
-    super.init(conf);
+    super.serviceInit(conf);
 
     tajoMasterInfo = new TajoMasterInfo();
-    if(yarnContainerMode && queryMasterMode) {
-      tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(cmdArgs[2]));
-      connectToCatalog();
-
-      QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
-      queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
-          queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
-    } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
-      taskRunnerManager.startTask(cmdArgs);
+    if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+      tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
     } else {
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-        tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
-      } else {
-        tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
-            .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
-        tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
-            .RESOURCE_TRACKER_RPC_ADDRESS)));
-      }
-      connectToCatalog();
+      tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
+      tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+          .RESOURCE_TRACKER_RPC_ADDRESS)));
     }
+    connectToCatalog();
 
     workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
     workerHeartbeatThread.init(conf);
@@ -309,13 +305,13 @@ public class TajoWorker extends CompositeService {
   }
 
   @Override
-  public void start() {
-    super.start();
+  public void serviceStart() throws Exception {
     initWorkerMetrics();
+    super.serviceStart();
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     if(stopped.getAndSet(true)) {
       return;
     }
@@ -349,14 +345,11 @@ public class TajoWorker extends CompositeService {
     }
 
     if(deletionService != null) deletionService.stop();
-    super.stop();
+    super.serviceStop();
     LOG.info("TajoWorker main thread exiting");
   }
 
   public class WorkerContext {
-    private ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource> sharedResourceMap =
-        new ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource>();
-
     public QueryMaster getQueryMaster() {
       if (queryMasterManagerService == null) {
         return null;
@@ -407,23 +400,8 @@ public class TajoWorker extends CompositeService {
       }
     }
 
-    public void initSharedResource(QueryContext queryContext, ExecutionBlockId blockId, String planJson)
-        throws InterruptedException {
-
-      if (!sharedResourceMap.containsKey(blockId)) {
-        ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
-        if (sharedResourceMap.putIfAbsent(blockId, resource) == null) {
-          resource.initialize(queryContext, planJson);
-        }
-      }
-    }
-
-    public ExecutionBlockSharedResource getSharedResource(ExecutionBlockId blockId) {
-      return sharedResourceMap.get(blockId);
-    }
-
-    public void releaseSharedResource(ExecutionBlockId blockId) {
-      sharedResourceMap.remove(blockId).release();
+    public LocalDirAllocator getLocalDirAllocator(){
+      return lDirAllocator;
     }
 
     protected void cleanup(String strPath) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index fa116c3..472ce1b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -25,14 +25,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.*;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.apache.tajo.worker.event.TaskRunnerStopEvent;
 
 import java.net.InetSocketAddress;
 
@@ -112,19 +116,16 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
-  public void executeExecutionBlock(RpcController controller,
+  public void startExecutionBlock(RpcController controller,
                                     TajoWorkerProtocol.RunExecutionBlockRequestProto request,
                                     RpcCallback<PrimitiveProtos.BoolProto> done) {
     workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
 
     try {
-      workerContext.initSharedResource(
-          new QueryContext(workerContext.getConf(), request.getQueryContext()),
-          TajoIdUtils.createExecutionBlockId(request.getExecutionBlockId()), request.getPlanJson());
 
       String[] params = new String[7];
       params[0] = "standby";  //mode(never used)
-      params[1] = request.getExecutionBlockId();
+      params[1] = request.getExecutionBlockId().toString();
       // NodeId has a form of hostname:port.
       params[2] = request.getNodeId();
       params[3] = request.getContainerId();
@@ -133,7 +134,14 @@ public class TajoWorkerManagerService extends CompositeService
       params[4] = request.getQueryMasterHost();
       params[5] = String.valueOf(request.getQueryMasterPort());
       params[6] = request.getQueryOutputPath();
-      workerContext.getTaskRunnerManager().startTask(params);
+
+      ExecutionBlockId executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
+      workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(
+          params
+          , executionBlockId,
+          new QueryContext(workerContext.getConf(), request.getQueryContext()),
+          request.getPlanJson()
+      ));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
@@ -142,6 +150,21 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
+  public void stopExecutionBlock(RpcController controller,
+                                 TajoIdProtos.ExecutionBlockIdProto requestProto,
+                                 RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStopEvent(
+          new ExecutionBlockId(requestProto)
+      ));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
   public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
                               RpcCallback<PrimitiveProtos.BoolProto> done) {
     Task task = workerContext.getTaskRunnerManager().getTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request));
@@ -162,13 +185,10 @@ public class TajoWorkerManagerService extends CompositeService
                                      TajoWorkerProtocol.ExecutionBlockListProto ebIds,
                                      RpcCallback<PrimitiveProtos.BoolProto> done) {
     for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : ebIds.getExecutionBlockIdList()) {
-      String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+      String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
       workerContext.cleanup(inputDir);
-      String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+      String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
       workerContext.cleanup(outputDir);
-
-      // Release shared resources
-      workerContext.releaseSharedResource(new ExecutionBlockId(executionBlockIdProto));
     }
     done.run(TajoWorker.TRUE_PROTO);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index d0665ae..7b4cbe1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -25,8 +25,10 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos;
@@ -43,11 +45,10 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.QueryUnitRequest;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -59,7 +60,7 @@ import java.net.URI;
 import java.text.NumberFormat;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
@@ -69,11 +70,9 @@ public class Task {
 
   private final TajoConf systemConf;
   private final QueryContext queryContext;
-  private final FileSystem localFS;
-  private TaskRunner.TaskRunnerContext taskRunnerContext;
-  private final QueryMasterProtocolService.Interface masterProxy;
-  private final LocalDirAllocator lDirAllocator;
+  private final ExecutionBlockContext executionBlockContext;
   private final QueryUnitAttemptId taskId;
+  private final String taskRunnerId;
 
   private final Path taskDir;
   private final QueryUnitRequest request;
@@ -85,7 +84,6 @@ public class Task {
   private boolean interQuery;
   private boolean killed = false;
   private boolean aborted = false;
-  private final Reporter reporter;
   private Path inputTableBaseDir;
 
   private long startTime;
@@ -97,7 +95,6 @@ public class Task {
   private ShuffleType shuffleType = null;
   private Schema finalSchema = null;
   private TupleComparator sortComp = null;
-  private ClientSocketChannelFactory channelFactory = null;
 
   static final String OUTPUT_FILE_PREFIX="part-";
   static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
@@ -132,45 +129,27 @@ public class Task {
         }
       };
 
-  public Task(QueryUnitAttemptId taskId,
-              final TaskRunner.TaskRunnerContext worker,
-              final QueryMasterProtocolService.Interface masterProxy,
+  public Task(String taskRunnerId,
+              Path baseDir,
+              QueryUnitAttemptId taskId,
+              final ExecutionBlockContext executionBlockContext,
               final QueryUnitRequest request) throws IOException {
+    this.taskRunnerId = taskRunnerId;
     this.request = request;
     this.taskId = taskId;
 
-    this.systemConf = worker.getConf();
+    this.systemConf = executionBlockContext.getConf();
     this.queryContext = request.getQueryContext();
-    this.taskRunnerContext = worker;
-    this.masterProxy = masterProxy;
-    this.localFS = worker.getLocalFS();
-    this.lDirAllocator = worker.getLocalDirAllocator();
-    this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
+    this.executionBlockContext = executionBlockContext;
+    this.taskDir = StorageUtil.concatPath(baseDir,
         taskId.getQueryUnitId().getId() + "_" + taskId.getId());
 
-    this.context = new TaskAttemptContext(queryContext, worker.getWorkerContext(), taskId,
+    this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
         request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
     this.context.setDataChannel(request.getDataChannel());
     this.context.setEnforcer(request.getEnforcer());
     this.inputStats = new TableStats();
 
-    this.reporter = new Reporter(taskId, masterProxy);
-    this.reporter.startCommunicationThread();
-
-
-    // resource intiailization
-    boolean resourceInitialized = false;
-    try {
-      resourceInitialized = context.getSharedResource().awaitInitializedResource();
-    } catch (InterruptedException e) {
-      LOG.error("Failed Resource Initialization", e);
-    } finally {
-      if (!resourceInitialized) {
-        setState(TaskAttemptState.TA_FAILED);
-        return;
-      }
-    }
-
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
     if (scanNode != null) {
@@ -234,11 +213,12 @@ public class Task {
   public void init() throws IOException {
     if (context.getState() == TaskAttemptState.TA_PENDING) {
       // initialize a task temporal dir
+      FileSystem localFS = executionBlockContext.getLocalFS();
       localFS.mkdirs(taskDir);
 
       if (request.getFetches().size() > 0) {
         inputTableBaseDir = localFS.makeQualified(
-            lDirAllocator.getLocalPathForWrite(
+            executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
                 getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
         localFS.mkdirs(inputTableBaseDir);
         Path tableDir;
@@ -296,8 +276,9 @@ public class Task {
   }
 
   public void fetch() {
+    ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
     for (Fetcher f : fetcherRunners) {
-      taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
+      executorService.submit(new FetchRunner(context, f));
     }
   }
 
@@ -305,25 +286,18 @@ public class Task {
     killed = true;
     context.stop();
     context.setState(TaskAttemptState.TA_KILLED);
-    releaseChannelFactory();
   }
 
   public void abort() {
     aborted = true;
     context.stop();
-    releaseChannelFactory();
   }
 
   public void cleanUp() {
     // remove itself from worker
     if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
-      try {
-        localFS.delete(context.getWorkDir(), true);
-        synchronized (taskRunnerContext.getTasks()) {
-          taskRunnerContext.getTasks().remove(this.getId());
-        }
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
+      synchronized (executionBlockContext.getTasks()) {
+        executionBlockContext.getTasks().remove(this.getId());
       }
     } else {
       LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
@@ -332,7 +306,7 @@ public class Task {
 
   public TaskStatusProto getReport() {
     TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
-    builder.setWorkerName(taskRunnerContext.getNodeId());
+    builder.setWorkerName(executionBlockContext.getTaskRunner(taskRunnerId).getNodeId().toString());
     builder.setId(context.getTaskId().getProto())
         .setProgress(context.getProgress())
         .setState(context.getState());
@@ -345,6 +319,23 @@ public class Task {
     return builder.build();
   }
 
+  public boolean isRunning(){
+    return context.getState() == TaskAttemptState.TA_RUNNING;
+  }
+  public boolean isProgressChanged() {
+    return context.isProgressChanged();
+  }
+
+  public void updateProgress() {
+    if(killed || aborted){
+      return;
+    }
+
+    if (executor != null && context.getProgress() < 1.0f) {
+      context.setExecutorProgress(executor.getProgress());
+    }
+  }
+
   private CatalogProtos.TableStatsProto reloadInputStats() {
     synchronized(inputStats) {
       if (this.executor == null) {
@@ -419,12 +410,11 @@ public class Task {
       FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
       context.updateAssignedFragments(inputTable, frags);
     }
-    releaseChannelFactory();
   }
 
-  public void run() {
+  public void run() throws Exception {
     startTime = System.currentTimeMillis();
-    Exception error = null;
+    Throwable error = null;
     try {
       context.setState(TaskAttemptState.TA_RUNNING);
 
@@ -433,16 +423,17 @@ public class Task {
         // complete.
         waitForFetch();
         context.setFetcherProgress(FETCHER_PROGRESS);
-        context.setProgress(FETCHER_PROGRESS);
+        context.setProgressChanged(true);
+        updateProgress();
       }
 
-      this.executor = taskRunnerContext.getTQueryEngine().
+      this.executor = executionBlockContext.getTQueryEngine().
           createPlan(context, plan);
       this.executor.init();
 
-      while(!killed && executor.next() != null) {
+      while(!killed && !aborted && executor.next() != null) {
       }
-    } catch (Exception e) {
+    } catch (Throwable e) {
       error = e ;
       LOG.error(e.getMessage(), e);
       aborted = true;
@@ -452,22 +443,20 @@ public class Task {
           executor.close();
           reloadInputStats();
         } catch (IOException e) {
-          e.printStackTrace();
+          LOG.error(e);
         }
         this.executor = null;
       }
 
-      context.setProgress(1.0f);
-      taskRunnerContext.completedTasksNum.incrementAndGet();
+      executionBlockContext.completedTasksNum.incrementAndGet();
       context.getHashShuffleAppenderManager().finalizeTask(taskId);
-
+      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
       if (killed || aborted) {
         context.setExecutorProgress(0.0f);
-        context.setProgress(0.0f);
         if(killed) {
           context.setState(TaskAttemptState.TA_KILLED);
-          masterProxy.statusUpdate(null, getReport(), NullCallback.get());
-          taskRunnerContext.killedTasksNum.incrementAndGet();
+          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+          executionBlockContext.killedTasksNum.incrementAndGet();
         } else {
           context.setState(TaskAttemptState.TA_FAILED);
           TaskFatalErrorReport.Builder errorBuilder =
@@ -482,47 +471,31 @@ public class Task {
             errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
           }
 
-          masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
-          taskRunnerContext.failedTasksNum.incrementAndGet();
-        }
-
-        // stopping the status report
-        try {
-          reporter.stopCommunicationThread();
-        } catch (InterruptedException e) {
-          LOG.warn(e);
+          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+          executionBlockContext.failedTasksNum.incrementAndGet();
         }
-
       } else {
         // if successful
         context.setProgress(1.0f);
         context.setState(TaskAttemptState.TA_SUCCEEDED);
-
-        // stopping the status report
-        try {
-          reporter.stopCommunicationThread();
-        } catch (InterruptedException e) {
-          LOG.warn(e);
-        }
+        executionBlockContext.succeededTasksNum.incrementAndGet();
 
         TaskCompletionReport report = getTaskCompletionReport();
-        masterProxy.done(null, report, NullCallback.get());
-        taskRunnerContext.succeededTasksNum.incrementAndGet();
+        queryMasterStub.done(null, report, NullCallback.get());
       }
       finishTime = System.currentTimeMillis();
       LOG.info(context.getTaskId() + " completed. " +
-          "Worker's task counter - total:" + taskRunnerContext.completedTasksNum.intValue() +
-          ", succeeded: " + taskRunnerContext.succeededTasksNum.intValue()
-          + ", killed: " + taskRunnerContext.killedTasksNum.intValue()
-          + ", failed: " + taskRunnerContext.failedTasksNum.intValue());
+          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
       cleanupTask();
     }
   }
 
   public void cleanupTask() {
-    taskRunnerContext.addTaskHistory(getId(), createTaskHistory());
-    taskRunnerContext.getTasks().remove(getId());
-    taskRunnerContext = null;
+    executionBlockContext.addTaskHistory(taskRunnerId, getId(), createTaskHistory());
+    executionBlockContext.getTasks().remove(getId());
 
     fetcherRunners.clear();
     fetcherRunners = null;
@@ -532,11 +505,8 @@ public class Task {
         executor = null;
       }
     } catch (IOException e) {
-      e.printStackTrace();
+      LOG.fatal(e.getMessage(), e);
     }
-    plan = null;
-    context = null;
-    releaseChannelFactory();
   }
 
   public TaskHistory createTaskHistory() {
@@ -577,7 +547,7 @@ public class Task {
         taskHistory.setFinishedFetchCount(i);
       }
     } catch (Exception e) {
-      e.printStackTrace();
+      LOG.warn(e.getMessage(), e);
     }
 
     return taskHistory;
@@ -673,7 +643,11 @@ public class Task {
 
   @VisibleForTesting
   public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
-    return ((float)(totalFetcher - remainFetcher)) / (float)totalFetcher * FETCHER_PROGRESS;
+    if (totalFetcher > 0) {
+      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
+    } else {
+      return 0.0f;
+    }
   }
 
   private synchronized void fetcherFinished(TaskAttemptContext ctx) {
@@ -681,24 +655,15 @@ public class Task {
     if(fetcherSize == 0) {
       return;
     }
-    try {
-      int numRunningFetcher = (int)(ctx.getFetchLatch().getCount()) - 1;
 
-      if (numRunningFetcher == 0) {
-        context.setProgress(FETCHER_PROGRESS);
-      } else {
-        context.setProgress(adjustFetchProcess(fetcherSize, numRunningFetcher));
-      }
-    } finally {
-      ctx.getFetchLatch().countDown();
-    }
-  }
+    ctx.getFetchLatch().countDown();
 
-  private void releaseChannelFactory(){
-    if(channelFactory != null) {
-      channelFactory.shutdown();
-      channelFactory.releaseExternalResources();
-      channelFactory = null;
+    int remainFetcher = (int) ctx.getFetchLatch().getCount();
+    if (remainFetcher == 0) {
+      context.setFetcherProgress(FETCHER_PROGRESS);
+    } else {
+      context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
+      context.setProgressChanged(true);
     }
   }
 
@@ -706,15 +671,9 @@ public class Task {
                                         List<FetchImpl> fetches) throws IOException {
 
     if (fetches.size() > 0) {
-
-      releaseChannelFactory();
-
-
-      int workerNum = ctx.getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
-      channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
-      Path inputDir = lDirAllocator.
-          getLocalPathToRead(
-              getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+      ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory();
+      Path inputDir = executionBlockContext.getLocalDirAllocator().
+          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
       File storeDir;
 
       int i = 0;
@@ -739,91 +698,9 @@ public class Task {
     }
   }
 
-  protected class Reporter {
-    private QueryMasterProtocolService.Interface masterStub;
-    private Thread pingThread;
-    private AtomicBoolean stop = new AtomicBoolean(false);
-    private static final int PROGRESS_INTERVAL = 3000;
-    private static final int MAX_RETRIES = 3;
-    private QueryUnitAttemptId taskId;
-
-    public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
-      this.taskId = taskId;
-      this.masterStub = masterStub;
-    }
-
-    Runnable createReporterThread() {
-
-      return new Runnable() {
-        int remainingRetries = MAX_RETRIES;
-        @Override
-        public void run() {
-          while (!stop.get() && !context.isStopped()) {
-            try {
-              if(executor != null && context.getProgress() < 1.0f) {
-                float progress = executor.getProgress();
-                context.setExecutorProgress(progress);
-              }
-            } catch (Throwable t) {
-              LOG.error("Get progress error: " + t.getMessage(), t);
-            }
-
-            try {
-              if (context.isPorgressChanged()) {
-                masterStub.statusUpdate(null, getReport(), NullCallback.get());
-              } else {
-                masterStub.ping(null, taskId.getProto(), NullCallback.get());
-              }
-            } catch (Throwable t) {
-              LOG.error(t.getMessage(), t);
-              remainingRetries -=1;
-              if (remainingRetries == 0) {
-                ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
-                LOG.warn("Last retry, exiting ");
-                throw new RuntimeException(t);
-              }
-            } finally {
-              if (!context.isStopped() && remainingRetries > 0) {
-                synchronized (pingThread) {
-                  try {
-                    pingThread.wait(PROGRESS_INTERVAL);
-                  } catch (InterruptedException e) {
-                  }
-                }
-              }
-            }
-          }
-        }
-      };
-    }
-
-    public void startCommunicationThread() {
-      if (pingThread == null) {
-        pingThread = new Thread(createReporterThread());
-        pingThread.setName("communication thread");
-        pingThread.start();
-      }
-    }
-
-    public void stopCommunicationThread() throws InterruptedException {
-      if(stop.getAndSet(true)){
-        return;
-      }
-
-      if (pingThread != null) {
-        // Intent of the lock is to not send an interupt in the middle of an
-        // umbilical.ping or umbilical.statusUpdate
-        synchronized(pingThread) {
-          //Interrupt if sleeping. Otherwise wait for the RPC call to return.
-          pingThread.notifyAll();
-        }
-      }
-    }
-  }
-
   public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
     Path workDir =
-        StorageUtil.concatPath(TaskRunner.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
+        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
             String.valueOf(quid.getQueryUnitId().getId()),
             String.valueOf(quid.getId()));
     return workDir;

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index d27fd6d..422ec2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -83,15 +83,15 @@ public class TaskAttemptContext {
   private Map<Integer, Long> partitionOutputVolume;
   private HashShuffleAppenderManager hashShuffleAppenderManager;
 
-  public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext,
+  public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext,
                             final QueryUnitAttemptId queryId,
                             final FragmentProto[] fragments,
                             final Path workDir) {
     this.queryContext = queryContext;
 
-    if (workerContext != null) { // For unit tests
-      this.workerContext = workerContext;
-      this.sharedResource = workerContext.getSharedResource(queryId.getQueryUnitId().getExecutionBlockId());
+    if (executionBlockContext != null) { // For unit tests
+      this.workerContext = executionBlockContext.getWorkerContext();
+      this.sharedResource = executionBlockContext.getSharedResource();
     }
 
     this.queryId = queryId;
@@ -315,22 +315,45 @@ public class TaskAttemptContext {
   public float getProgress() {
     return this.progress;
   }
-  
+
   public void setProgress(float progress) {
     float previousProgress = this.progress;
-    this.progress = progress;
-    progressChanged.set(previousProgress != progress);
+
+    if (Float.isNaN(progress) || Float.isInfinite(progress)) {
+      this.progress = 0.0f;
+    } else {
+      this.progress = progress;
+    }
+
+    if (previousProgress != progress) {
+      setProgressChanged(true);
+    }
   }
 
-  public boolean isPorgressChanged() {
+  public boolean isProgressChanged() {
     return progressChanged.get();
   }
+
+  public void setProgressChanged(boolean changed){
+    progressChanged.set(changed);
+  }
+
   public void setExecutorProgress(float executorProgress) {
-    float adjustProgress = executorProgress * (1 - fetcherProgress);
-    setProgress(fetcherProgress + adjustProgress);
+    if(Float.isNaN(executorProgress) || Float.isInfinite(executorProgress)){
+      executorProgress = 0.0f;
+    }
+
+    if (hasFetchPhase()) {
+      setProgress(fetcherProgress + (executorProgress * 0.5f));
+    } else {
+      setProgress(executorProgress);
+    }
   }
 
   public void setFetcherProgress(float fetcherProgress) {
+    if(Float.isNaN(fetcherProgress) || Float.isInfinite(fetcherProgress)){
+      fetcherProgress = 0.0f;
+    }
     this.fetcherProgress = fetcherProgress;
   }
 
@@ -375,10 +398,6 @@ public class TaskAttemptContext {
     return queryContext;
   }
 
-  public WorkerContext getWorkContext() {
-    return workerContext;
-  }
-
   public QueryUnitAttemptId getQueryId() {
     return queryId;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 32cd4f5..ea8ed82 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -18,42 +18,26 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.engine.utils.TupleCache;
-import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.worker.TajoWorker.WorkerContext;
 
-import java.net.InetSocketAddress;
-import java.util.Map;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 
@@ -67,142 +51,78 @@ public class TaskRunner extends AbstractService {
   private TajoConf systemConf;
 
   private volatile boolean stopped = false;
+  private Path baseDirPath;
 
-  private ExecutionBlockId executionBlockId;
-  private QueryId queryId;
   private NodeId nodeId;
   private ContainerId containerId;
 
-  // Cluster Management
-  //private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
-
-  // for temporal or intermediate files
-  private FileSystem localFS;
-  // for input files
-  private FileSystem defaultFS;
-
-  private TajoQueryEngine queryEngine;
-
   // for Fetcher
   private ExecutorService fetchLauncher;
-  // It keeps all of the query unit attempts while a TaskRunner is running.
-  private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
-
-  private LocalDirAllocator lDirAllocator;
 
   // A thread to receive each assigned query unit and execute the query unit
   private Thread taskLauncher;
 
   // Contains the object references related for TaskRunner
-  private TaskRunnerContext taskRunnerContext;
-  // for the doAs block
-  private UserGroupInformation taskOwner;
-
-  // for the local temporal dir
-  private String baseDir;
-  private Path baseDirPath;
-
-  private TaskRunnerManager taskRunnerManager;
+  private ExecutionBlockContext executionBlockContext;
 
   private long finishTime;
 
-  private RpcConnectionPool connPool;
-
-  private InetSocketAddress qmMasterAddr;
-
   private TaskRunnerHistory history;
 
-  public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
+  public TaskRunner(ExecutionBlockContext executionBlockContext, String[] args) {
     super(TaskRunner.class.getName());
 
-    this.taskRunnerManager = taskRunnerManager;
-    this.connPool = RpcConnectionPool.getPool(conf);
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    ThreadFactory fetcherFactory = builder.setNameFormat("Fetcher executor #%d").build();
+    this.systemConf = executionBlockContext.getConf();
     this.fetchLauncher = Executors.newFixedThreadPool(
-        conf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM));
+        systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
     try {
-      final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
-
-      LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
-      LOG.info("Worker Local Dir: " + conf.getVar(ConfVars.WORKER_TEMPORAL_DIR));
-
-      UserGroupInformation.setConfiguration(conf);
-
       // QueryBlockId from String
       // NodeId has a form of hostname:port.
-      NodeId nodeId = ConverterUtils.toNodeId(args[2]);
+      this.nodeId = ConverterUtils.toNodeId(args[2]);
       this.containerId = ConverterUtils.toContainerId(args[3]);
 
 
       // QueryMaster's address
-      String host = args[4];
-      int port = Integer.parseInt(args[5]);
-      this.qmMasterAddr = NetUtils.createSocketAddrForHost(host, port);
-
-      LOG.info("QueryMaster Address:" + qmMasterAddr);
-      // TODO - 'load credential' should be implemented
-      // Getting taskOwner
-      UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
-      //taskOwner.addToken(token);
-
-      // initialize MasterWorkerProtocol as an actual task owner.
-//      this.client =
-//          taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
-//            @Override
-//            public AsyncRpcClient run() throws Exception {
-//              return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
-//            }
-//          });
-//      this.master = client.getStub();
-
-      this.executionBlockId = executionBlockId;
-      this.queryId = executionBlockId.getQueryId();
-      this.nodeId = nodeId;
-      this.taskOwner = taskOwner;
-
-      this.taskRunnerContext = new TaskRunnerContext();
-      this.history = new TaskRunnerHistory(containerId, executionBlockId);
+      //String host = args[4];
+      //int port = Integer.parseInt(args[5]);
+
+      this.executionBlockContext = executionBlockContext;
+      this.history = executionBlockContext.createTaskRunnerHistory(this);
       this.history.setState(getServiceState());
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
   }
 
-  protected void sendExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport reporter) throws Exception {
-    QueryMasterProtocol.QueryMasterProtocolService.Interface qmClientService = null;
-    NettyClientBase qmClient = null;
-    try {
-      qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
-      qmClientService = qmClient.getStub();
-      qmClientService.doneExecutionBlock(null, reporter, NullCallback.get());
-    } finally {
-      connPool.releaseConnection(qmClient);
-    }
+  // TODO this is expensive. we should change to unique id
+  public String getId() {
+    return getId(getContext().getExecutionBlockId(), containerId);
   }
 
-  public String getId() {
-    return getId(executionBlockId, containerId);
+  public NodeId getNodeId(){
+    return nodeId;
+  }
+
+  public ContainerId getContainerId(){
+    return containerId;
   }
 
   public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) {
     return executionBlockId + "," + containerId;
   }
 
-  public static Path getBaseOutputDir(ExecutionBlockId executionBlockId){
-    Path workDir =
-        StorageUtil.concatPath(
-            executionBlockId.getQueryId().toString(),
-            "output",
-            String.valueOf(executionBlockId.getId()));
-    return workDir;
+  public TaskRunnerHistory getHistory(){
+    return history;
   }
 
-  public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
-    Path workDir =
-        StorageUtil.concatPath(
-            executionBlockId.getQueryId().toString(),
-            "in",
-            executionBlockId.toString());
-    return workDir;
+  public Path getTaskBaseDir(){
+    return baseDirPath;
+  }
+
+  public ExecutorService getFetchLauncher() {
+    return fetchLauncher;
   }
 
   @Override
@@ -210,27 +130,13 @@ public class TaskRunner extends AbstractService {
     this.systemConf = (TajoConf)conf;
 
     try {
-      // initialize DFS and LocalFileSystems
-      defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf);
-      localFS = FileSystem.getLocal(conf);
-
       // the base dir for an output dir
-      baseDir = getBaseOutputDir(executionBlockId).toString();
-
-      // initialize LocalDirAllocator
-      lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-
-      baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
-      LOG.info("TaskRunner basedir is created (" + baseDir +")");
-
-      // Setup QueryEngine according to the query plan
-      // Here, we can setup row-based query engine or columnar query engine.
-      this.queryEngine = new TajoQueryEngine(systemConf);
+      baseDirPath = getContext().createBaseDir();
+      LOG.info("TaskRunner basedir is created (" + baseDirPath +")");
     } catch (Throwable t) {
       t.printStackTrace();
       LOG.error(t);
     }
-
     super.init(conf);
     this.history.setState(getServiceState());
   }
@@ -253,23 +159,10 @@ public class TaskRunner extends AbstractService {
     // If this flag become true, taskLauncher will be terminated.
     this.stopped = true;
 
-    // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
-    for (Task task : tasks.values()) {
-      if (task.getStatus() == TaskAttemptState.TA_PENDING ||
-          task.getStatus() == TaskAttemptState.TA_RUNNING) {
-        task.setState(TaskAttemptState.TA_FAILED);
-        task.abort();
-      }
-    }
-
-    tasks.clear();
     fetchLauncher.shutdown();
     fetchLauncher = null;
-    this.queryEngine = null;
-
-    TupleCache.getInstance().removeBroadcastCache(executionBlockId);
 
-    LOG.info("Stop TaskRunner: " + executionBlockId);
+    LOG.info("Stop TaskRunner: " + getId());
     synchronized (this) {
       notifyAll();
     }
@@ -281,71 +174,8 @@ public class TaskRunner extends AbstractService {
     return finishTime;
   }
 
-  public class TaskRunnerContext {
-    public AtomicInteger completedTasksNum = new AtomicInteger();
-    public AtomicInteger succeededTasksNum = new AtomicInteger();
-    public AtomicInteger killedTasksNum = new AtomicInteger();
-    public AtomicInteger failedTasksNum = new AtomicInteger();
-
-    public TajoConf getConf() {
-      return systemConf;
-    }
-
-    public String getNodeId() {
-      return nodeId.toString();
-    }
-
-    public FileSystem getLocalFS() {
-      return localFS;
-    }
-
-    public FileSystem getDefaultFS() {
-      return defaultFS;
-    }
-
-    public LocalDirAllocator getLocalDirAllocator() {
-      return lDirAllocator;
-    }
-
-    public TajoQueryEngine getTQueryEngine() {
-      return queryEngine;
-    }
-
-    public Map<QueryUnitAttemptId, Task> getTasks() {
-      return tasks;
-    }
-
-    public Task getTask(QueryUnitAttemptId taskId) {
-      return tasks.get(taskId);
-    }
-
-    public ExecutorService getFetchLauncher() {
-      return fetchLauncher;
-    }
-
-    public Path getBaseDir() {
-      return baseDirPath;
-    }
-
-    public ExecutionBlockId getExecutionBlockId() {
-      return executionBlockId;
-    }
-
-    public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
-      history.addTaskHistory(quAttemptId, taskHistory);
-    }
-
-    public TaskRunnerHistory getExcutionBlockHistory(){
-      return history;
-    }
-
-    public WorkerContext getWorkerContext() {
-      return taskRunnerManager.getWorkerContext();
-    }
-  }
-
-  public TaskRunnerContext getContext() {
-    return taskRunnerContext;
+  public ExecutionBlockContext getContext() {
+    return executionBlockContext;
   }
 
   static void fatalError(QueryMasterProtocolService.Interface qmClientService,
@@ -372,17 +202,15 @@ public class TaskRunner extends AbstractService {
           QueryUnitRequestProto taskRequest = null;
 
           while(!stopped) {
-            NettyClientBase qmClient = null;
-            QueryMasterProtocolService.Interface qmClientService = null;
+            QueryMasterProtocolService.Interface qmClientService;
             try {
-              qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
-              qmClientService = qmClient.getStub();
+              qmClientService = getContext().getQueryMasterStub();
 
               if (callFuture == null) {
                 callFuture = new CallFuture<QueryUnitRequestProto>();
                 LOG.info("Request GetTask: " + getId());
                 GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
-                    .setExecutionBlockId(executionBlockId.getProto())
+                    .setExecutionBlockId(getExecutionBlockId().getProto())
                     .setContainerId(((ContainerIdPBImpl) containerId).getProto())
                     .build();
 
@@ -414,17 +242,14 @@ public class TaskRunner extends AbstractService {
                 if (taskRequest.getShouldDie()) {
                   LOG.info("Received ShouldDie flag:" + getId());
                   stop();
-                  if(taskRunnerManager != null) {
-                    //notify to TaskRunnerManager
-                    taskRunnerManager.stopTask(getId());
-                    taskRunnerManager= null;
-                  }
+                  //notify to TaskRunnerManager
+                  getContext().stopTaskRunner(getId());
                 } else {
-                  taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
+                  getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
                   LOG.info("Accumulated Received Task: " + (++receivedNum));
 
                   QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
-                  if (tasks.containsKey(taskAttemptId)) {
+                  if (getContext().getTasks().containsKey(taskAttemptId)) {
                     LOG.error("Duplicate Task Attempt: " + taskAttemptId);
                     fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
                     continue;
@@ -433,9 +258,9 @@ public class TaskRunner extends AbstractService {
                   LOG.info("Initializing: " + taskAttemptId);
                   Task task;
                   try {
-                    task = new Task(taskAttemptId, taskRunnerContext, qmClientService,
+                    task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
                         new QueryUnitRequestImpl(taskRequest));
-                    tasks.put(taskAttemptId, task);
+                    getContext().getTasks().put(taskAttemptId, task);
 
                     task.init();
                     if (task.hasFetchPhase()) {
@@ -453,9 +278,7 @@ public class TaskRunner extends AbstractService {
                 }
               }
             } catch (Throwable t) {
-              t.printStackTrace();
-            } finally {
-              connPool.releaseConnection(qmClient);
+              LOG.fatal(t.getMessage(), t);
             }
           }
         }
@@ -463,12 +286,6 @@ public class TaskRunner extends AbstractService {
       taskLauncher.start();
     } catch (Throwable t) {
       LOG.fatal("Unhandled exception. Starting shutdown.", t);
-    } finally {
-      for (Task t : tasks.values()) {
-        if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
-          t.abort();
-        }
-      }
     }
   }
 
@@ -480,6 +297,6 @@ public class TaskRunner extends AbstractService {
   }
 
   public ExecutionBlockId getExecutionBlockId() {
-    return this.executionBlockId;
+    return getContext().getExecutionBlockId();
   }
 }