You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:21 UTC

[18/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
new file mode 100644
index 0000000..79f14a2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.TajoContainerProxy;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.event.ContainerAllocationEvent;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.SubQueryState;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoResourceAllocator extends AbstractResourceAllocator {
+  private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class);
+
+  private TajoConf tajoConf;
+  private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
+  private final ExecutorService executorService;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+    this.queryTaskContext = queryTaskContext;
+    executorService = Executors.newFixedThreadPool(
+        queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+  }
+
+  @Override
+  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerIdProto) {
+    TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+    ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId());
+    containerId.setApplicationAttemptId(appAttemptId);
+    containerId.setId(containerIdProto.getId());
+    return containerId;
+  }
+
+  @Override
+  public void allocateTaskWorker() {
+  }
+
+  @Override
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+                                           int numTasks,
+                                           int memoryMBPerTask) {
+    //TODO consider disk slot
+
+    TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+    int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
+    clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
+    LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
+        ", Number of Cluster Slots=" + clusterSlots);
+    return  Math.min(numTasks, clusterSlots);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    tajoConf = (TajoConf)conf;
+
+    queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
+
+    queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
+
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (stopped.getAndSet(true)) {
+      return;
+    }
+
+    executorService.shutdownNow();
+
+    Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
+    List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
+    for(ContainerProxy eachProxy: list) {
+      try {
+        eachProxy.stopContainer();
+      } catch (Exception e) {
+        LOG.warn(e.getMessage());
+      }
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  class TajoTaskRunnerLauncher implements TaskRunnerLauncher {
+    @Override
+    public void handle(TaskRunnerGroupEvent event) {
+      if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) {
+        launchTaskRunners(event.getExecutionBlockId(), event.getContainers());
+      } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
+        stopContainers(event.getContainers());
+      }
+    }
+  }
+
+  private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+    // Query in standby mode doesn't need launch Worker.
+    // But, Assign ExecutionBlock to assigned tajo worker
+    for(Container eachContainer: containers) {
+      TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
+          eachContainer, executionBlockId);
+      executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
+    }
+  }
+
+  protected static class LaunchRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+      this.proxy = proxy;
+      this.id = id;
+    }
+    @Override
+    public void run() {
+      proxy.launch(null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ContainerProxy started:" + id);
+      }
+    }
+  }
+
+  private void stopContainers(Collection<Container> containers) {
+    for (Container container : containers) {
+      final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId());
+      executorService.submit(new StopContainerRunner(container.getId(), proxy));
+    }
+  }
+
+  private static class StopContainerRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+      this.id = id;
+      this.proxy = proxy;
+    }
+
+    @Override
+    public void run() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ContainerProxy stopped:" + id + "," + proxy.getId());
+      }
+      proxy.stopContainer();
+    }
+  }
+
+  class TajoWorkerAllocationHandler implements EventHandler<ContainerAllocationEvent> {
+    @Override
+    public void handle(ContainerAllocationEvent event) {
+      executorService.submit(new TajoWorkerAllocationThread(event));
+    }
+  }
+
+  class TajoWorkerAllocationThread extends Thread {
+    ContainerAllocationEvent event;
+    TajoWorkerAllocationThread(ContainerAllocationEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Start TajoWorkerAllocationThread");
+      CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
+          new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+
+      //TODO consider task's resource usage pattern
+      int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
+      float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
+
+      TajoMasterProtocol.WorkerResourceAllocationRequest request =
+          TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+              .setMinMemoryMBPerContainer(requiredMemoryMB)
+              .setMaxMemoryMBPerContainer(requiredMemoryMB)
+              .setNumContainers(event.getRequiredNum())
+              .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
+                  : TajoMasterProtocol.ResourceRequestPriority.DISK)
+              .setMinDiskSlotPerContainer(requiredDiskSlots)
+              .setMaxDiskSlotPerContainer(requiredDiskSlots)
+              .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
+              .build();
+
+      RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
+      NettyClientBase tmClient = null;
+      try {
+        tmClient = connPool.getConnection(
+            queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
+        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        masterClientService.allocateWorkerResources(null, request, callBack);
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      } finally {
+        connPool.releaseConnection(tmClient);
+      }
+
+      TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+      while(!stopped.get()) {
+        try {
+          response = callBack.get(3, TimeUnit.SECONDS);
+          break;
+        } catch (InterruptedException e) {
+          if(stopped.get()) {
+            return;
+          }
+        } catch (TimeoutException e) {
+          LOG.info("No available worker resource for " + event.getExecutionBlockId());
+          continue;
+        }
+      }
+      int numAllocatedContainers = 0;
+
+      if(response != null) {
+        List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+        ExecutionBlockId executionBlockId = event.getExecutionBlockId();
+
+        List<Container> containers = new ArrayList<Container>();
+        for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+          TajoWorkerContainer container = new TajoWorkerContainer();
+          NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(),
+              eachAllocatedResource.getPeerRpcPort());
+
+          TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+          containerId.setApplicationAttemptId(
+              ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
+                  eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
+          containerId.setId(eachAllocatedResource.getContainerId().getId());
+
+          container.setId(containerId);
+          container.setNodeId(nodeId);
+
+
+          WorkerResource workerResource = new WorkerResource();
+          workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
+          workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
+
+          Worker worker = new Worker(null, workerResource);
+          worker.setHostName(nodeId.getHost());
+          worker.setPeerRpcPort(nodeId.getPort());
+          worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
+          worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
+
+          container.setWorkerResource(worker);
+
+          containers.add(container);
+        }
+
+        SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
+        if (!SubQuery.isRunningState(state)) {
+          try {
+            List<ContainerId> containerIds = new ArrayList<ContainerId>();
+            for(Container eachContainer: containers) {
+              containerIds.add(eachContainer.getId());
+            }
+            TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
+          } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+          }
+          return;
+        }
+
+        if (allocatedResources.size() > 0) {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
+          }
+          queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
+        }
+        numAllocatedContainers += allocatedResources.size();
+
+      }
+      if(event.getRequiredNum() > numAllocatedContainers) {
+        ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
+            event.getType(), event.getExecutionBlockId(), event.getPriority(),
+            event.getResource(),
+            event.getRequiredNum() - numAllocatedContainers,
+            event.isLeafQuery(), event.getProgress()
+        );
+        queryTaskContext.getEventHandler().handle(shortRequestEvent);
+
+      }
+      LOG.info("Stop TajoWorkerAllocationThread");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
new file mode 100644
index 0000000..0b8d6c2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.codahale.metrics.Gauge;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.shell.PathData;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogClient;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.querymaster.QueryMasterManagerService;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+import org.apache.tajo.webapp.StaticHttpServer;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+public class TajoWorker extends CompositeService {
+  public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+  public static final String WORKER_MODE_YARN_TASKRUNNER = "tr";
+  public static final String WORKER_MODE_YARN_QUERYMASTER = "qm";
+  public static final String WORKER_MODE_STANDBY = "standby";
+  public static final String WORKER_MODE_QUERY_MASTER = "standby-qm";
+  public static final String WORKER_MODE_TASKRUNNER = "standby-tr";
+
+  private static final Log LOG = LogFactory.getLog(TajoWorker.class);
+
+  private TajoConf systemConf;
+
+  private StaticHttpServer webServer;
+
+  private TajoWorkerClientService tajoWorkerClientService;
+
+  private QueryMasterManagerService queryMasterManagerService;
+
+  private TajoWorkerManagerService tajoWorkerManagerService;
+
+  private InetSocketAddress tajoMasterAddress;
+
+  private InetSocketAddress workerResourceTrackerAddr;
+
+  private CatalogClient catalogClient;
+
+  private WorkerContext workerContext;
+
+  private TaskRunnerManager taskRunnerManager;
+
+  private TajoPullServerService pullService;
+
+  private boolean yarnContainerMode;
+
+  private boolean queryMasterMode;
+
+  private boolean taskRunnerMode;
+
+  private WorkerHeartbeatService workerHeartbeatThread;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private AtomicInteger numClusterNodes = new AtomicInteger();
+
+  private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+
+  private int httpPort;
+
+  private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+  private RpcConnectionPool connPool;
+
+  private String[] cmdArgs;
+
+  private DeletionService deletionService;
+
+  private TajoSystemMetrics workerSystemMetrics;
+
+  public TajoWorker() throws Exception {
+    super(TajoWorker.class.getName());
+  }
+
+  public void startWorker(TajoConf systemConf, String[] args) {
+    this.systemConf = systemConf;
+    this.cmdArgs = args;
+    setWorkerMode(args);
+    init(systemConf);
+    start();
+  }
+
+  private void setWorkerMode(String[] args) {
+    if(args.length < 1) {
+      queryMasterMode = systemConf.getBoolean("tajo.worker.mode.querymaster", true);
+      taskRunnerMode = systemConf.getBoolean("tajo.worker.mode.taskrunner", true);
+    } else {
+      if(WORKER_MODE_STANDBY.equals(args[0])) {
+        queryMasterMode = true;
+        taskRunnerMode = true;
+      } else if(WORKER_MODE_YARN_TASKRUNNER.equals(args[0])) {
+        yarnContainerMode = true;
+        queryMasterMode = true;
+      } else if(WORKER_MODE_YARN_QUERYMASTER.equals(args[0])) {
+        yarnContainerMode = true;
+        taskRunnerMode = true;
+      } else if(WORKER_MODE_QUERY_MASTER.equals(args[0])) {
+        yarnContainerMode = false;
+        queryMasterMode = true;
+      } else {
+        yarnContainerMode = false;
+        taskRunnerMode = true;
+      }
+    }
+    if(!queryMasterMode && !taskRunnerMode) {
+      LOG.fatal("Worker daemon exit cause no worker mode(querymaster/taskrunner) property");
+      System.exit(0);
+    }
+  }
+  
+  @Override
+  public void init(Configuration conf) {
+    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+
+    this.systemConf = (TajoConf)conf;
+    RackResolver.init(systemConf);
+
+    this.connPool = RpcConnectionPool.getPool(systemConf);
+    this.workerContext = new WorkerContext();
+
+    String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
+
+    boolean randomPort = true;
+    if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
+      randomPort = false;
+    }
+    int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
+    int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
+    int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
+
+    if(randomPort) {
+      clientPort = 0;
+      peerRpcPort = 0;
+      qmManagerPort = 0;
+      systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
+    }
+
+    // querymaster worker
+    tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
+    addService(tajoWorkerClientService);
+
+    queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
+    addService(queryMasterManagerService);
+
+    // taskrunner worker
+    taskRunnerManager = new TaskRunnerManager(workerContext);
+    addService(taskRunnerManager);
+
+    tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+    addService(tajoWorkerManagerService);
+
+    if(!yarnContainerMode) {
+      if(taskRunnerMode) {
+        pullService = new TajoPullServerService();
+        addService(pullService);
+      }
+
+      if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+        try {
+          httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+          if(queryMasterMode && !taskRunnerMode) {
+            //If QueryMaster and TaskRunner run on single host, http port conflicts
+            httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
+          }
+          webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
+              true, null, systemConf, null);
+          webServer.start();
+          httpPort = webServer.getPort();
+          LOG.info("Worker info server started:" + httpPort);
+
+          deletionService = new DeletionService(getMountPath().size(), 0);
+          if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
+            getWorkerContext().cleanupTemporalDirectories();
+          }
+        } catch (IOException e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }
+
+    LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
+        ", qmRpcPort=" + qmManagerPort +
+        ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
+        ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
+
+    super.init(conf);
+
+    if(yarnContainerMode && queryMasterMode) {
+      tajoMasterAddress = NetUtils.createSocketAddr(cmdArgs[2]);
+      connectToCatalog();
+
+      QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
+      queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
+          queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
+    } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
+      taskRunnerManager.startTask(cmdArgs);
+    } else {
+      tajoMasterAddress = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+      workerResourceTrackerAddr = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+      connectToCatalog();
+    }
+
+    workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
+    workerHeartbeatThread.init(conf);
+    addIfService(workerHeartbeatThread);
+  }
+
+  private void initWorkerMetrics() {
+    workerSystemMetrics = new TajoSystemMetrics(systemConf, "worker", workerContext.getWorkerName());
+    workerSystemMetrics.start();
+
+    workerSystemMetrics.register("querymaster", "runningQueries", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        if(queryMasterManagerService != null) {
+          return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size();
+        } else {
+          return 0;
+        }
+      }
+    });
+
+    workerSystemMetrics.register("task", "runningTasks", new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        if(taskRunnerManager != null) {
+          return taskRunnerManager.getNumTasks();
+        } else {
+          return 0;
+        }
+      }
+    });
+  }
+
+  public WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    initWorkerMetrics();
+  }
+
+  @Override
+  public void stop() {
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    if(webServer != null) {
+      try {
+        webServer.stop();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    if (catalogClient != null) {
+      catalogClient.close();
+    }
+
+    if(connPool != null) {
+      connPool.shutdown();
+      RpcChannelFactory.shutdown();
+    }
+
+    if(webServer != null && webServer.isAlive()) {
+      try {
+        webServer.stop();
+      } catch (Exception e) {
+      }
+    }
+
+    if(workerSystemMetrics != null) {
+      workerSystemMetrics.stop();
+    }
+
+    if(deletionService != null) deletionService.stop();
+    super.stop();
+    LOG.info("TajoWorker main thread exiting");
+  }
+
+  public class WorkerContext {
+    public QueryMaster getQueryMaster() {
+      if(queryMasterManagerService == null) {
+        return null;
+      }
+      return queryMasterManagerService.getQueryMaster();
+    }
+
+    public TajoWorkerManagerService getTajoWorkerManagerService() {
+      return tajoWorkerManagerService;
+    }
+
+    public QueryMasterManagerService getQueryMasterManagerService() {
+      return queryMasterManagerService;
+    }
+
+    public TajoWorkerClientService getTajoWorkerClientService() {
+      return tajoWorkerClientService;
+    }
+
+    public TaskRunnerManager getTaskRunnerManager() {
+      return taskRunnerManager;
+    }
+
+    public CatalogService getCatalog() {
+      return catalogClient;
+    }
+
+    public TajoPullServerService getPullService() {
+      return pullService;
+    }
+
+    public int getHttpPort() {
+      return httpPort;
+    }
+
+    public String getWorkerName() {
+      if(queryMasterMode) {
+        return getQueryMasterManagerService().getHostAndPort();
+      } else {
+        return getTajoWorkerManagerService().getHostAndPort();
+      }
+    }
+    public void stopWorker(boolean force) {
+      stop();
+      if(force) {
+        System.exit(0);
+      }
+    }
+
+    protected void cleanup(String strPath) {
+      if(deletionService == null) return;
+
+      LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+      try {
+        Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf);
+        FileSystem localFS = FileSystem.getLocal(systemConf);
+        for (Path path : iter){
+          deletionService.delete(localFS.makeQualified(path));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    protected void cleanupTemporalDirectories() {
+      if(deletionService == null) return;
+
+      LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+      try {
+        Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
+        FileSystem localFS = FileSystem.getLocal(systemConf);
+        for (Path path : iter){
+          PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf);
+
+          ArrayList<Path> paths = new ArrayList<Path>();
+          for (PathData pd : items){
+            paths.add(pd.path);
+          }
+          if(paths.size() == 0) continue;
+
+          deletionService.delete(null, paths.toArray(new Path[paths.size()]));
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    public boolean isYarnContainerMode() {
+      return yarnContainerMode;
+    }
+
+    public void setNumClusterNodes(int numClusterNodes) {
+      TajoWorker.this.numClusterNodes.set(numClusterNodes);
+    }
+
+    public int getNumClusterNodes() {
+      return TajoWorker.this.numClusterNodes.get();
+    }
+
+    public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+      synchronized(numClusterNodes) {
+        TajoWorker.this.clusterResource = clusterResource;
+      }
+    }
+
+    public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+      synchronized(numClusterNodes) {
+        return TajoWorker.this.clusterResource;
+      }
+    }
+
+    public InetSocketAddress getTajoMasterAddress() {
+      return tajoMasterAddress;
+    }
+
+    public InetSocketAddress getResourceTrackerAddress() {
+      return workerResourceTrackerAddr;
+    }
+
+    public int getPeerRpcPort() {
+      return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
+    }
+
+    public boolean isQueryMasterMode() {
+      return queryMasterMode;
+    }
+
+    public boolean isTaskRunnerMode() {
+      return taskRunnerMode;
+    }
+
+    public TajoSystemMetrics getWorkerSystemMetrics() {
+      return workerSystemMetrics;
+    }
+  }
+
+  public void stopWorkerForce() {
+    stop();
+  }
+
+  private void connectToCatalog() {
+    try {
+      catalogClient = new CatalogClient(systemConf);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  private class ShutdownHook implements Runnable {
+    @Override
+    public void run() {
+      if(!stopped.get()) {
+        LOG.info("============================================");
+        LOG.info("TajoWorker received SIGINT Signal");
+        LOG.info("============================================");
+        stop();
+      }
+    }
+  }
+
+  String getThreadTaskName(long id, String name) {
+    if (name == null) {
+      return Long.toString(id);
+    }
+    return id + " (" + name + ")";
+  }
+
+  public void dumpThread(Writer writer) {
+    PrintWriter stream = new PrintWriter(writer);
+    int STACK_DEPTH = 20;
+    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+    long[] threadIds = threadBean.getAllThreadIds();
+    stream.println("Process Thread Dump: Tajo Worker");
+    stream.println(threadIds.length + " active threads");
+    for (long tid : threadIds) {
+      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+      if (info == null) {
+        stream.println("  Inactive");
+        continue;
+      }
+      stream.println("Thread " + getThreadTaskName(info.getThreadId(), info.getThreadName()) + ":");
+      Thread.State state = info.getThreadState();
+      stream.println("  State: " + state + ",  Blocked count: " + info.getBlockedCount() +
+          ",  Waited count: " + info.getWaitedCount());
+      if (contention) {
+        stream.println("  Blocked time: " + info.getBlockedTime() + ",  Waited time: " + info.getWaitedTime());
+      }
+      if (state == Thread.State.WAITING) {
+        stream.println("  Waiting on " + info.getLockName());
+      } else if (state == Thread.State.BLOCKED) {
+        stream.println("  Blocked on " + info.getLockName() +
+            ", Blocked by " + getThreadTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
+      }
+      stream.println("  Stack:");
+      for (StackTraceElement frame : info.getStackTrace()) {
+        stream.println("    " + frame.toString());
+      }
+      stream.println("");
+    }
+  }
+
+  public static List<File> getMountPath() throws IOException {
+    BufferedReader mountOutput = null;
+    try {
+      Process mountProcess = Runtime.getRuntime ().exec("mount");
+      mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
+      List<File> mountPaths = new ArrayList<File>();
+      while (true) {
+        String line = mountOutput.readLine();
+        if (line == null) {
+          break;
+        }
+
+        int indexStart = line.indexOf(" on /");
+        int indexEnd = line.indexOf(" ", indexStart + 4);
+
+        mountPaths.add(new File(line.substring (indexStart + 4, indexEnd)));
+      }
+      return mountPaths;
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw e;
+    } finally {
+      if(mountOutput != null) {
+        mountOutput.close();
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
+
+    TajoConf tajoConf = new TajoConf();
+    tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
+    try {
+      TajoWorker tajoWorker = new TajoWorker();
+      tajoWorker.startWorker(tajoConf, args);
+    } catch (Throwable t) {
+      LOG.fatal("Error starting TajoWorker", t);
+      System.exit(-1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
new file mode 100644
index 0000000..937d886
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.master.querymaster.Query;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.rpc.BlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class TajoWorkerClientService extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
+  private final PrimitiveProtos.BoolProto BOOL_TRUE =
+          PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  private final PrimitiveProtos.BoolProto BOOL_FALSE =
+          PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+  private BlockingRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private int port;
+  private TajoConf conf;
+  private TajoWorker.WorkerContext workerContext;
+  private TajoWorkerClientProtocolServiceHandler serviceHandler;
+
+  public TajoWorkerClientService(TajoWorker.WorkerContext workerContext, int port) {
+    super(TajoWorkerClientService.class.getName());
+
+    this.port = port;
+    this.workerContext = workerContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    this.conf = (TajoConf) conf;
+    this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
+
+    // init RPC Server in constructor cause Heartbeat Thread use bindAddr
+    // Setup RPC server
+    try {
+      // TODO initial port num is value of config and find unused port with sequence
+      InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      // TODO blocking/non-blocking??
+      int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+      this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum);
+      this.rpcServer.start();
+
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+      this.port = bindAddr.getPort();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("TajoWorkerClientService stopping");
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("TajoWorkerClientService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+  public class TajoWorkerClientProtocolServiceHandler
+          implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
+    @Override
+    public PrimitiveProtos.BoolProto updateSessionVariables(
+            RpcController controller,
+            ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
+      return null;
+    }
+
+    @Override
+    public ClientProtos.GetQueryResultResponse getQueryResult(
+            RpcController controller,
+            ClientProtos.GetQueryResultRequest request) throws ServiceException {
+      QueryId queryId = new QueryId(request.getQueryId());
+      Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery();
+
+      ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
+      try {
+        builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
+      } catch (IOException e) {
+        LOG.warn("Can't get current user name");
+      }
+
+      if(query == null) {
+        builder.setErrorMessage("No Query for " + queryId);
+      } else {
+        switch (query.getState()) {
+          case QUERY_SUCCEEDED:
+            builder.setTableDesc(query.getResultDesc().getProto());
+            break;
+          case QUERY_FAILED:
+          case QUERY_ERROR:
+            builder.setErrorMessage("Query " + queryId + " is failed");
+          default:
+            builder.setErrorMessage("Query " + queryId + " is still running");
+        }
+      }
+      return builder.build();
+    }
+
+    @Override
+    public ClientProtos.GetQueryStatusResponse getQueryStatus(
+            RpcController controller,
+            ClientProtos.GetQueryStatusRequest request) throws ServiceException {
+      ClientProtos.GetQueryStatusResponse.Builder builder
+              = ClientProtos.GetQueryStatusResponse.newBuilder();
+      QueryId queryId = new QueryId(request.getQueryId());
+
+      builder.setQueryId(request.getQueryId());
+
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+        builder.setResultCode(ClientProtos.ResultCode.OK);
+        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+      } else {
+        QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
+
+        builder.setResultCode(ClientProtos.ResultCode.OK);
+        builder.setQueryMasterHost(bindAddr.getHostName());
+        builder.setQueryMasterPort(bindAddr.getPort());
+
+        if (queryMasterTask == null) {
+          queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
+        }
+        if (queryMasterTask == null) {
+          builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
+          return builder.build();
+        }
+
+        builder.setHasResult(
+            !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
+                queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
+        );
+
+        queryMasterTask.touchSessionTime();
+        Query query = queryMasterTask.getQuery();
+
+        if (query != null) {
+          builder.setState(query.getState());
+          builder.setProgress(query.getProgress());
+          builder.setSubmitTime(query.getAppSubmitTime());
+          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+            builder.setFinishTime(query.getFinishTime());
+          } else {
+            builder.setFinishTime(System.currentTimeMillis());
+          }
+        } else {
+          builder.setState(queryMasterTask.getState());
+          builder.setErrorMessage(queryMasterTask.getErrorMessage());
+        }
+      }
+      return builder.build();
+    }
+
+    @Override
+    public PrimitiveProtos.BoolProto closeQuery (
+            RpcController controller,
+            TajoIdProtos.QueryIdProto request) throws ServiceException {
+      final QueryId queryId = new QueryId(request);
+      LOG.info("Stop Query:" + queryId);
+      return BOOL_TRUE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
new file mode 100644
index 0000000..392a7cf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+
+public class TajoWorkerManagerService extends CompositeService
+    implements TajoWorkerProtocol.TajoWorkerProtocolService.Interface {
+  private static final Log LOG = LogFactory.getLog(TajoWorkerManagerService.class.getName());
+
+  private AsyncRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private int port;
+
+  private TajoWorker.WorkerContext workerContext;
+
+  public TajoWorkerManagerService(TajoWorker.WorkerContext workerContext, int port) {
+    super(TajoWorkerManagerService.class.getName());
+    this.workerContext = workerContext;
+    this.port = port;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf tajoConf = (TajoConf) conf;
+    try {
+      // Setup RPC server
+      InetSocketAddress initIsa =
+          new InetSocketAddress("0.0.0.0", port);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM);
+      this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa, workerNum);
+      this.rpcServer.start();
+
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+      this.port = bindAddr.getPort();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info("TajoWorkerManagerService is bind to " + addr);
+    tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
+    super.init(tajoConf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("TajoWorkerManagerService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+  public String getHostAndPort() {
+    return bindAddr.getHostName() + ":" + bindAddr.getPort();
+  }
+
+  @Override
+  public void ping(RpcController controller,
+                   TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
+  public void executeExecutionBlock(RpcController controller,
+                                    TajoWorkerProtocol.RunExecutionBlockRequestProto request,
+                                    RpcCallback<PrimitiveProtos.BoolProto> done) {
+    workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
+    try {
+      String[] params = new String[7];
+      params[0] = "standby";  //mode(never used)
+      params[1] = request.getExecutionBlockId();
+      // NodeId has a form of hostname:port.
+      params[2] = request.getNodeId();
+      params[3] = request.getContainerId();
+
+      // QueryMaster's address
+      params[4] = request.getQueryMasterHost();
+      params[5] = String.valueOf(request.getQueryMasterPort());
+      params[6] = request.getQueryOutputPath();
+      workerContext.getTaskRunnerManager().startTask(params);
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
+                              RpcCallback<PrimitiveProtos.BoolProto> done) {
+    workerContext.getTaskRunnerManager().findTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)).kill();
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
+  public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
+                      RpcCallback<PrimitiveProtos.BoolProto> done) {
+    workerContext.cleanup(new QueryId(request).toString());
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
new file mode 100644
index 0000000..30f56ee
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.ApplicationIdUtils;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.text.NumberFormat;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class Task {
+  private static final Log LOG = LogFactory.getLog(Task.class);
+  private static final float FETCHER_PROGRESS = 0.5f;
+
+  private final TajoConf systemConf;
+  private final QueryContext queryContext;
+  private final FileSystem localFS;
+  private TaskRunner.TaskRunnerContext taskRunnerContext;
+  private final QueryMasterProtocolService.Interface masterProxy;
+  private final LocalDirAllocator lDirAllocator;
+  private final QueryUnitAttemptId taskId;
+
+  private final Path taskDir;
+  private final QueryUnitRequest request;
+  private TaskAttemptContext context;
+  private List<Fetcher> fetcherRunners;
+  private LogicalNode plan;
+  private final Map<String, TableDesc> descs = Maps.newHashMap();
+  private PhysicalExec executor;
+  private boolean interQuery;
+  private boolean killed = false;
+  private boolean aborted = false;
+  private boolean stopped = false;
+  private final Reporter reporter;
+  private Path inputTableBaseDir;
+
+  private static int completedTasksNum = 0;
+  private static int succeededTasksNum = 0;
+  private static int killedTasksNum = 0;
+  private static int failedTasksNum = 0;
+
+  private long startTime;
+  private long finishTime;
+
+  private final TableStats inputStats;
+
+  // TODO - to be refactored
+  private ShuffleType shuffleType = null;
+  private Schema finalSchema = null;
+  private TupleComparator sortComp = null;
+  private ClientSocketChannelFactory channelFactory = null;
+
+  static final String OUTPUT_FILE_PREFIX="part-";
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(2);
+          return fmt;
+        }
+      };
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(6);
+          return fmt;
+        }
+      };
+
+  public Task(QueryUnitAttemptId taskId,
+              final TaskRunner.TaskRunnerContext worker,
+              final QueryMasterProtocolService.Interface masterProxy,
+              final QueryUnitRequest request) throws IOException {
+    this.request = request;
+    this.taskId = taskId;
+
+    this.systemConf = worker.getConf();
+    this.queryContext = request.getQueryContext();
+    this.taskRunnerContext = worker;
+    this.masterProxy = masterProxy;
+    this.localFS = worker.getLocalFS();
+    this.lDirAllocator = worker.getLocalDirAllocator();
+    this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
+        taskId.getQueryUnitId().getId() + "_" + taskId.getId());
+
+    this.context = new TaskAttemptContext(systemConf, taskId,
+        request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
+    this.context.setDataChannel(request.getDataChannel());
+    this.context.setEnforcer(request.getEnforcer());
+    this.inputStats = new TableStats();
+
+    this.reporter = new Reporter(taskId, masterProxy);
+    this.reporter.startCommunicationThread();
+
+    plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
+    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
+    for (LogicalNode node : scanNode) {
+      ScanNode scan = (ScanNode)node;
+      descs.put(scan.getCanonicalName(), scan.getTableDesc());
+    }
+
+    interQuery = request.getProto().getInterQuery();
+    if (interQuery) {
+      context.setInterQuery();
+      this.shuffleType = context.getDataChannel().getShuffleType();
+
+      if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
+        SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+        this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+        this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
+      }
+    } else {
+      // The final result of a task will be written in a file named part-ss-nnnnnnn,
+      // where ss is the subquery id associated with this task, and nnnnnn is the task id.
+      Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME,
+          OUTPUT_FILE_PREFIX +
+          OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
+          OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
+      LOG.info("Output File Path: " + outFilePath);
+      context.setOutputPath(outFilePath);
+    }
+
+    context.setState(TaskAttemptState.TA_PENDING);
+    LOG.info("==================================");
+    LOG.info("* Subquery " + request.getId() + " is initialized");
+    LOG.info("* InterQuery: " + interQuery
+        + (interQuery ? ", Use " + this.shuffleType + " shuffle":""));
+
+    LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
+    LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
+    for (Fetch f : request.getFetches()) {
+      LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls());
+    }
+    LOG.info("* Local task dir: " + taskDir);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("* plan:\n");
+      LOG.debug(plan.toString());
+    }
+    LOG.info("==================================");
+  }
+
+  public void init() throws IOException {
+    // initialize a task temporal dir
+    localFS.mkdirs(taskDir);
+
+    if (request.getFetches().size() > 0) {
+      inputTableBaseDir = localFS.makeQualified(
+          lDirAllocator.getLocalPathForWrite(
+              getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+      localFS.mkdirs(inputTableBaseDir);
+      Path tableDir;
+      for (String inputTable : context.getInputTables()) {
+        tableDir = new Path(inputTableBaseDir, inputTable);
+        if (!localFS.exists(tableDir)) {
+          LOG.info("the directory is created  " + tableDir.toUri());
+          localFS.mkdirs(tableDir);
+        }
+      }
+    }
+    // for localizing the intermediate data
+    localize(request);
+  }
+
+  public QueryUnitAttemptId getTaskId() {
+    return taskId;
+  }
+
+  public static Log getLog() {
+    return LOG;
+  }
+
+  public void localize(QueryUnitRequest request) throws IOException {
+    fetcherRunners = getFetchRunners(context, request.getFetches());
+  }
+
+  public QueryUnitAttemptId getId() {
+    return context.getTaskId();
+  }
+
+  public TaskAttemptState getStatus() {
+    return context.getState();
+  }
+
+  public String toString() {
+    return "queryId: " + this.getId() + " status: " + this.getStatus();
+  }
+
+  public void setState(TaskAttemptState status) {
+    context.setState(status);
+  }
+
+  public TaskAttemptContext getContext() {
+    return context;
+  }
+
+  public boolean hasFetchPhase() {
+    return fetcherRunners.size() > 0;
+  }
+
+  public void fetch() {
+    for (Fetcher f : fetcherRunners) {
+      taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
+    }
+  }
+
+  public void kill() {
+    killed = true;
+    context.stop();
+    context.setState(TaskAttemptState.TA_KILLED);
+    releaseChannelFactory();
+  }
+
+  public void abort() {
+    aborted = true;
+    context.stop();
+    releaseChannelFactory();
+  }
+
+  public void cleanUp() {
+    // remove itself from worker
+    if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
+      try {
+        localFS.delete(context.getWorkDir(), true);
+        synchronized (taskRunnerContext.getTasks()) {
+          taskRunnerContext.getTasks().remove(this.getId());
+        }
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    } else {
+      LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
+    }
+  }
+
+  public TaskStatusProto getReport() {
+    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
+    builder.setWorkerName(taskRunnerContext.getNodeId());
+    builder.setId(context.getTaskId().getProto())
+        .setProgress(context.getProgress())
+        .setState(context.getState());
+
+    builder.setInputStats(reloadInputStats());
+
+    if (context.getResultStats() != null) {
+      builder.setResultStats(context.getResultStats().getProto());
+    }
+    return builder.build();
+  }
+
+  private CatalogProtos.TableStatsProto reloadInputStats() {
+    synchronized(inputStats) {
+      if (this.executor == null) {
+        return inputStats.getProto();
+      }
+
+      TableStats executorInputStats = this.executor.getInputStats();
+
+      if (executorInputStats != null) {
+        inputStats.setValues(executorInputStats);
+      }
+      return inputStats.getProto();
+    }
+  }
+
+  private TaskCompletionReport getTaskCompletionReport() {
+    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
+    builder.setId(context.getTaskId().getProto());
+
+    builder.setInputStats(reloadInputStats());
+
+    if (context.hasResultStats()) {
+      builder.setResultStats(context.getResultStats().getProto());
+    } else {
+      builder.setResultStats(new TableStats().getProto());
+    }
+
+    Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs();
+    if (it.hasNext()) {
+      do {
+        Entry<Integer,String> entry = it.next();
+        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+        part.setPartId(entry.getKey());
+        builder.addShuffleFileOutputs(part.build());
+      } while (it.hasNext());
+    }
+
+    return builder.build();
+  }
+
+  private void waitForFetch() throws InterruptedException, IOException {
+    context.getFetchLatch().await();
+    LOG.info(context.getTaskId() + " All fetches are done!");
+    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
+    for (String inputTable: inputs) {
+      File tableDir = new File(context.getFetchIn(), inputTable);
+      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+      context.updateAssignedFragments(inputTable, frags);
+    }
+    releaseChannelFactory();
+  }
+
+  public void run() {
+    startTime = System.currentTimeMillis();
+    String errorMessage = null;
+    try {
+      context.setState(TaskAttemptState.TA_RUNNING);
+
+      if (context.hasFetchPhase()) {
+        // If the fetch is still in progress, the query unit must wait for
+        // complete.
+        waitForFetch();
+        context.setFetcherProgress(FETCHER_PROGRESS);
+        context.setProgress(FETCHER_PROGRESS);
+      }
+
+      if (context.getFragmentSize() > 0) {
+        this.executor = taskRunnerContext.getTQueryEngine().
+            createPlan(context, plan);
+        this.executor.init();
+        while(!killed && executor.next() != null) {
+        }
+        this.executor.close();
+        reloadInputStats();
+        this.executor = null;
+      }
+    } catch (Exception e) {
+      // errorMessage will be sent to master.
+      errorMessage = ExceptionUtils.getStackTrace(e);
+      LOG.error(errorMessage);
+      aborted = true;
+    } finally {
+      context.setProgress(1.0f);
+      stopped = true;
+      completedTasksNum++;
+
+      if (killed || aborted) {
+        context.setExecutorProgress(0.0f);
+        context.setProgress(0.0f);
+        if(killed) {
+          context.setState(TaskAttemptState.TA_KILLED);
+          masterProxy.statusUpdate(null, getReport(), NullCallback.get());
+          killedTasksNum++;
+        } else {
+          context.setState(TaskAttemptState.TA_FAILED);
+          TaskFatalErrorReport.Builder errorBuilder =
+              TaskFatalErrorReport.newBuilder()
+                  .setId(getId().getProto());
+          if (errorMessage != null) {
+            errorBuilder.setErrorMessage(errorMessage);
+          }
+
+          masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
+          failedTasksNum++;
+        }
+
+        // stopping the status report
+        try {
+          reporter.stopCommunicationThread();
+        } catch (InterruptedException e) {
+          LOG.warn(e);
+        }
+
+      } else {
+        // if successful
+        context.setProgress(1.0f);
+        context.setState(TaskAttemptState.TA_SUCCEEDED);
+
+        // stopping the status report
+        try {
+          reporter.stopCommunicationThread();
+        } catch (InterruptedException e) {
+          LOG.warn(e);
+        }
+
+        TaskCompletionReport report = getTaskCompletionReport();
+        masterProxy.done(null, report, NullCallback.get());
+        succeededTasksNum++;
+      }
+
+      finishTime = System.currentTimeMillis();
+
+      cleanupTask();
+      LOG.info("Task Counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+          + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
+    }
+  }
+
+  public void cleanupTask() {
+    taskRunnerContext.addTaskHistory(getId(), getTaskHistory());
+    taskRunnerContext.getTasks().remove(getId());
+    taskRunnerContext = null;
+
+    fetcherRunners.clear();
+    executor = null;
+    plan = null;
+    context = null;
+    releaseChannelFactory();
+  }
+
+  public TaskHistory getTaskHistory() {
+    TaskHistory taskHistory = new TaskHistory();
+    taskHistory.setStartTime(startTime);
+    taskHistory.setFinishTime(finishTime);
+    if (context.getOutputPath() != null) {
+      taskHistory.setOutputPath(context.getOutputPath().toString());
+    }
+
+    if (context.getWorkDir() != null) {
+      taskHistory.setWorkingPath(context.getWorkDir().toString());
+    }
+
+    try {
+      taskHistory.setStatus(getStatus().toString());
+      taskHistory.setProgress(context.getProgress());
+
+      taskHistory.setInputStats(new TableStats(reloadInputStats()));
+      if (context.getResultStats() != null) {
+        taskHistory.setOutputStats((TableStats)context.getResultStats().clone());
+      }
+
+      if (hasFetchPhase()) {
+        Map<URI, TaskHistory.FetcherHistory> fetcherHistories = new HashMap<URI, TaskHistory.FetcherHistory>();
+
+        for(Fetcher eachFetcher: fetcherRunners) {
+          TaskHistory.FetcherHistory fetcherHistory = new TaskHistory.FetcherHistory();
+          fetcherHistory.setStartTime(eachFetcher.getStartTime());
+          fetcherHistory.setFinishTime(eachFetcher.getFinishTime());
+          fetcherHistory.setStatus(eachFetcher.getStatus());
+          fetcherHistory.setUri(eachFetcher.getURI().toString());
+          fetcherHistory.setFileLen(eachFetcher.getFileLen());
+          fetcherHistory.setMessageReceiveCount(eachFetcher.getMessageReceiveCount());
+
+          fetcherHistories.put(eachFetcher.getURI(), fetcherHistory);
+        }
+
+        taskHistory.setFetchers(fetcherHistories);
+      }
+    } catch (Exception e) {
+      taskHistory.setStatus(StringUtils.stringifyException(e));
+      e.printStackTrace();
+    }
+
+    return taskHistory;
+  }
+
+  public int hashCode() {
+    return context.hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof Task) {
+      Task other = (Task) obj;
+      return this.context.equals(other.context);
+    }
+    return false;
+  }
+
+  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+      throws IOException {
+    Configuration c = new Configuration(systemConf);
+    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
+    FileSystem fs = FileSystem.get(c);
+    Path tablePath = new Path(file.getAbsolutePath());
+
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus f : fileLists) {
+      if (f.getLen() == 0) {
+        continue;
+      }
+      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
+      listTablets.add(tablet);
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  private class FetchRunner implements Runnable {
+    private final TaskAttemptContext ctx;
+    private final Fetcher fetcher;
+
+    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+      this.ctx = ctx;
+      this.fetcher = fetcher;
+    }
+
+    @Override
+    public void run() {
+      int retryNum = 0;
+      int maxRetryNum = 5;
+      int retryWaitTime = 1000;
+
+      try { // for releasing fetch latch
+        while(retryNum < maxRetryNum) {
+          if (retryNum > 0) {
+            try {
+              Thread.sleep(retryWaitTime);
+            } catch (InterruptedException e) {
+              LOG.error(e);
+            }
+            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+          }
+          try {
+            File fetched = fetcher.get();
+            if (fetched != null) {
+              break;
+            }
+          } catch (IOException e) {
+            LOG.error("Fetch failed: " + fetcher.getURI(), e);
+          }
+          retryNum++;
+        }
+      } finally {
+        fetcherFinished(ctx);
+      }
+
+      if (retryNum == maxRetryNum) {
+        LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
+    return ((float)(totalFetcher - remainFetcher)) / (float)totalFetcher * FETCHER_PROGRESS;
+  }
+
+  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
+    int fetcherSize = fetcherRunners.size();
+    if(fetcherSize == 0) {
+      return;
+    }
+    try {
+      int numRunningFetcher = (int)(ctx.getFetchLatch().getCount()) - 1;
+
+      if (numRunningFetcher == 0) {
+        context.setProgress(FETCHER_PROGRESS);
+      } else {
+        context.setProgress(adjustFetchProcess(fetcherSize, numRunningFetcher));
+      }
+    } finally {
+      ctx.getFetchLatch().countDown();
+    }
+  }
+
+  private void releaseChannelFactory(){
+    if(channelFactory != null) {
+      channelFactory.shutdown();
+      channelFactory.releaseExternalResources();
+      channelFactory = null;
+    }
+  }
+
+  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
+                                        List<Fetch> fetches) throws IOException {
+
+    if (fetches.size() > 0) {
+
+      releaseChannelFactory();
+
+
+      int workerNum = ctx.getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
+      channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
+      Path inputDir = lDirAllocator.
+          getLocalPathToRead(
+              getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
+      File storeDir;
+
+      int i = 0;
+      File storeFile;
+      List<Fetcher> runnerList = Lists.newArrayList();
+      for (Fetch f : fetches) {
+        storeDir = new File(inputDir.toString(), f.getName());
+        if (!storeDir.exists()) {
+          storeDir.mkdirs();
+        }
+        storeFile = new File(storeDir, "in_" + i);
+        Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile, channelFactory);
+        runnerList.add(fetcher);
+        i++;
+      }
+      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
+      return runnerList;
+    } else {
+      return Lists.newArrayList();
+    }
+  }
+
+  protected class Reporter {
+    private QueryMasterProtocolService.Interface masterStub;
+    private Thread pingThread;
+    private AtomicBoolean stop = new AtomicBoolean(false);
+    private static final int PROGRESS_INTERVAL = 3000;
+    private QueryUnitAttemptId taskId;
+
+    public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) {
+      this.taskId = taskId;
+      this.masterStub = masterStub;
+    }
+
+    Runnable createReporterThread() {
+
+      return new Runnable() {
+        final int MAX_RETRIES = 3;
+        int remainingRetries = MAX_RETRIES;
+        @Override
+        public void run() {
+          while (!stop.get() && !stopped) {
+            try {
+              if(executor != null && context.getProgress() < 1.0f) {
+                float progress = executor.getProgress();
+                context.setExecutorProgress(progress);
+              }
+            } catch (Throwable t) {
+              LOG.error("Get progress error: " + t.getMessage(), t);
+            }
+
+            try {
+              if (context.isPorgressChanged()) {
+                masterStub.statusUpdate(null, getReport(), NullCallback.get());
+              } else {
+                masterStub.ping(null, taskId.getProto(), NullCallback.get());
+              }
+            } catch (Throwable t) {
+              LOG.error(t.getMessage(), t);
+              remainingRetries -=1;
+              if (remainingRetries == 0) {
+                ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+                LOG.warn("Last retry, exiting ");
+                throw new RuntimeException(t);
+              }
+            } finally {
+              if (remainingRetries > 0) {
+                synchronized (pingThread) {
+                  try {
+                    pingThread.wait(PROGRESS_INTERVAL);
+                  } catch (InterruptedException e) {
+                  }
+                }
+              }
+            }
+          }
+        }
+      };
+    }
+
+    public void startCommunicationThread() {
+      if (pingThread == null) {
+        pingThread = new Thread(createReporterThread());
+        pingThread.setName("communication thread");
+        pingThread.start();
+      }
+    }
+
+    public void stopCommunicationThread() throws InterruptedException {
+      if(stop.getAndSet(true)){
+        return;
+      }
+
+      if (pingThread != null) {
+        // Intent of the lock is to not send an interupt in the middle of an
+        // umbilical.ping or umbilical.statusUpdate
+        synchronized(pingThread) {
+          //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+          pingThread.notifyAll();
+        }
+      }
+    }
+  }
+
+  public static final String FILECACHE = "filecache";
+  public static final String APPCACHE = "appcache";
+  public static final String USERCACHE = "usercache";
+
+  String fileCache;
+  public String getFileCacheDir() {
+    fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" +
+        ConverterUtils.toString(ApplicationIdUtils.queryIdToAppId(taskId.getQueryUnitId().getExecutionBlockId().getQueryId())) +
+        "/" + "output";
+    return fileCache;
+  }
+
+  public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
+    Path workDir =
+        StorageUtil.concatPath(
+            quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(),
+            "in",
+            quid.getQueryUnitId().getExecutionBlockId().toString(),
+            String.valueOf(quid.getQueryUnitId().getId()),
+            String.valueOf(quid.getId()));
+    return workDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
new file mode 100644
index 0000000..6f3281c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+
+import java.io.File;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+
+/**
+ * Contains the information about executing subquery.
+ */
+public class TaskAttemptContext {
+  private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class);
+  private final TajoConf conf;
+  private final Map<String, List<FragmentProto>> fragmentMap = Maps.newHashMap();
+
+  private TaskAttemptState state;
+  private TableStats resultStats;
+  private QueryUnitAttemptId queryId;
+  private final Path workDir;
+  private boolean needFetch = false;
+  private CountDownLatch doneFetchPhaseSignal;
+  private float progress = 0.0f;
+  private float fetcherProgress = 0.0f;
+  private AtomicBoolean progressChanged = new AtomicBoolean(false);
+
+  /** a map of shuffled file outputs */
+  private Map<Integer, String> shuffleFileOutputs;
+  private File fetchIn;
+  private boolean stopped = false;
+  private boolean interQuery = false;
+  private Path outputPath;
+  private DataChannel dataChannel;
+  private Enforcer enforcer;
+
+  public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
+                            final FragmentProto[] fragments,
+                            final Path workDir) {
+    this.conf = conf;
+    this.queryId = queryId;
+    
+    for(FragmentProto t : fragments) {
+      if (fragmentMap.containsKey(t.getId())) {
+        fragmentMap.get(t.getId()).add(t);
+      } else {
+        List<FragmentProto> frags = new ArrayList<FragmentProto>();
+        frags.add(t);
+        fragmentMap.put(t.getId(), frags);
+      }
+    }
+
+    this.workDir = workDir;
+    this.shuffleFileOutputs = Maps.newHashMap();
+
+    state = TaskAttemptState.TA_PENDING;
+  }
+
+  @VisibleForTesting
+  public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
+                            final Fragment [] fragments,  final Path workDir) {
+    this(conf, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+  }
+
+  public TajoConf getConf() {
+    return this.conf;
+  }
+  
+  public TaskAttemptState getState() {
+    return this.state;
+  }
+  
+  public void setState(TaskAttemptState state) {
+    this.state = state;
+    LOG.info("Query status of " + getTaskId() + " is changed to " + state);
+  }
+
+  public void setDataChannel(DataChannel dataChannel) {
+    this.dataChannel = dataChannel;
+  }
+
+  public DataChannel getDataChannel() {
+    return dataChannel;
+  }
+
+  public void setEnforcer(Enforcer enforcer) {
+    this.enforcer = enforcer;
+  }
+
+  public Enforcer getEnforcer() {
+    return this.enforcer;
+  }
+
+  public boolean hasResultStats() {
+    return resultStats != null;
+  }
+
+  public void setResultStats(TableStats stats) {
+    this.resultStats = stats;
+  }
+
+  public TableStats getResultStats() {
+    return this.resultStats;
+  }
+
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  public void setInterQuery() {
+    this.interQuery = true;
+  }
+
+  public void setOutputPath(Path outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  public Path getOutputPath() {
+    return this.outputPath;
+  }
+
+  public boolean isInterQuery() {
+    return this.interQuery;
+  }
+  
+  public void stop() {
+    this.stopped = true;
+  }
+  
+  public void addFetchPhase(int count, File fetchIn) {
+    this.needFetch = true;
+    this.doneFetchPhaseSignal = new CountDownLatch(count);
+    this.fetchIn = fetchIn;
+  }
+  
+  public File getFetchIn() {
+    return this.fetchIn;
+  }
+  
+  public boolean hasFetchPhase() {
+    return this.needFetch;
+  }
+  
+  public CountDownLatch getFetchLatch() {
+    return doneFetchPhaseSignal;
+  }
+  
+  public void addShuffleFileOutput(int partId, String fileName) {
+    shuffleFileOutputs.put(partId, fileName);
+  }
+  
+  public Iterator<Entry<Integer,String>> getShuffleFileOutputs() {
+    return shuffleFileOutputs.entrySet().iterator();
+  }
+  
+  public void updateAssignedFragments(String tableId, Fragment[] fragments) {
+    fragmentMap.remove(tableId);
+    for(Fragment t : fragments) {
+      if (fragmentMap.containsKey(t.getTableName())) {
+        fragmentMap.get(t.getTableName()).add(t.getProto());
+      } else {
+        List<FragmentProto> frags = new ArrayList<FragmentProto>();
+        frags.add(t.getProto());
+        fragmentMap.put(t.getTableName(), frags);
+      }
+    }
+  }
+  
+  public Path getWorkDir() {
+    return this.workDir;
+  }
+  
+  public QueryUnitAttemptId getTaskId() {
+    return this.queryId;
+  }
+  
+  public float getProgress() {
+    return this.progress;
+  }
+  
+  public void setProgress(float progress) {
+    float previousProgress = this.progress;
+    this.progress = progress;
+    progressChanged.set(previousProgress != progress);
+  }
+
+  public boolean isPorgressChanged() {
+    return progressChanged.get();
+  }
+  public void setExecutorProgress(float executorProgress) {
+    float adjustProgress = executorProgress * (1 - fetcherProgress);
+    setProgress(fetcherProgress + adjustProgress);
+  }
+
+  public void setFetcherProgress(float fetcherProgress) {
+    this.fetcherProgress = fetcherProgress;
+  }
+
+  public FragmentProto getTable(String id) {
+    if (fragmentMap.get(id) == null) {
+      //for empty table
+      return null;
+    }
+    return fragmentMap.get(id).get(0);
+  }
+
+  public int getFragmentSize() {
+    return fragmentMap.size();
+  }
+
+  public Collection<String> getInputTables() {
+    return fragmentMap.keySet();
+  }
+  
+  public FragmentProto [] getTables(String id) {
+    if (fragmentMap.get(id) == null) {
+      //for empty table
+      return null;
+    }
+    return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
+  }
+  
+  public int hashCode() {
+    return Objects.hashCode(queryId);
+  }
+  
+  public boolean equals(Object obj) {
+    if (obj instanceof TaskAttemptContext) {
+      TaskAttemptContext other = (TaskAttemptContext) obj;
+      return queryId.equals(other.getTaskId());
+    } else {
+      return false;
+    }
+  }
+}
\ No newline at end of file