You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:02 UTC

[4/8] TAJO-91: Launch QueryMaster on NodeManager per query. (hyoungjunkim via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
new file mode 100644
index 0000000..871ba77
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.global.GlobalOptimizer;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.*;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.rm.RMContainerAllocator;
+import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.ProtoBlockingRpcClient;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+// TODO - when exception, send error status to QueryMasterManager
+public class QueryMaster extends CompositeService implements EventHandler {
+  private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
+  private static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  private static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+  private static int QUERY_SESSION_TIMEOUT = 60 * 1000;  //60 sec
+
+  // AppMaster Common
+  private final long appSubmitTime;
+  private Clock clock;
+
+  // For Query
+  private final QueryId queryId;
+  private QueryContext queryContext;
+  private Query query;
+  private TajoProtos.QueryState state = TajoProtos.QueryState.QUERY_NOT_ASSIGNED;
+  private String statusMessage;
+  private MasterPlan masterPlan;
+
+  private AsyncDispatcher dispatcher;
+  private RMContainerAllocator rmAllocator;
+
+  //service handler for QueryMasterManager, Worker
+  private QueryMasterService queryMasterService;
+  private QueryMasterClientService queryMasterClientService;
+
+  private TaskRunnerLauncher taskRunnerLauncher;
+  private GlobalPlanner globalPlanner;
+  private GlobalOptimizer globalOptimizer;
+
+  private boolean isCreateTableStmt;
+  private StorageManager storageManager;
+  private Path outputPath;
+  private QueryConf queryConf;
+  private ApplicationAttemptId appAttemptId;
+  private ApplicationId appId;
+  private ProtoBlockingRpcClient queryMasterManagerClient;
+  private QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface queryMasterManagerService;
+
+  private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+
+  private String queryMasterManagerAddress;
+
+  private YarnRPC yarnRPC;
+
+  private YarnClient yarnClient;
+
+  private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
+
+  public QueryMaster(final QueryId queryId, final long appSubmitTime, String queryMasterManagerAddress) {
+    super(QueryMaster.class.getName());
+
+    this.queryId = queryId;
+    this.appSubmitTime = appSubmitTime;
+    this.appId = queryId.getApplicationId();
+    this.queryMasterManagerAddress = queryMasterManagerAddress;
+
+    LOG.info("Created Query Master for " + queryId);
+  }
+
+  public void init(Configuration conf) {
+    try {
+      queryConf = new QueryConf(conf);
+      QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+      queryContext = new QueryContext(queryConf);
+      yarnRPC = YarnRPC.create(queryContext.getConf());
+      connectYarnClient();
+
+      LOG.info("Init QueryMasterManagerClient connection to:" + queryMasterManagerAddress);
+      InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterManagerAddress);
+      queryMasterManagerClient = new ProtoBlockingRpcClient(QueryMasterManagerProtocol.class, addr);
+      queryMasterManagerService = queryMasterManagerClient.getStub();
+
+      clock = new SystemClock();
+
+      this.dispatcher = new AsyncDispatcher();
+      addIfService(dispatcher);
+
+      this.storageManager = new StorageManager(queryConf);
+
+      globalPlanner = new GlobalPlanner(queryConf, storageManager, dispatcher.getEventHandler());
+      globalOptimizer = new GlobalOptimizer();
+
+      queryMasterService = new QueryMasterService();
+      addIfService(queryMasterService);
+
+      queryMasterClientService = new QueryMasterClientService(queryContext);
+      addIfService(queryMasterClientService);
+
+      initStagingDir();
+
+      dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
+      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+      dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
+      dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+
+      clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
+
+      clientSessionTimeoutCheckThread.start();
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      throw new RuntimeException(t);
+    }
+    super.init(conf);
+  }
+
+  class ClientSessionTimeoutCheckThread extends Thread {
+    public void run() {
+      LOG.info("ClientSessionTimeoutCheckThread started");
+      while(true) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          break;
+        }
+        try {
+          long lastHeartbeat = queryContext.getLastClientHeartbeat();
+          long time = System.currentTimeMillis() - lastHeartbeat;
+          if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+            LOG.warn("Query " + queryId + " stopped cause query sesstion timeout: " + time + " ms");
+            QueryMaster.this.stop();
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  class QueryHeartbeatThread extends Thread {
+    public QueryHeartbeatThread() {
+      super("QueryHeartbeatThread");
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Start QueryMaster heartbeat thread");
+      while(queryMasterManagerClient.isConnected()) {
+        QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat =
+            QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
+                .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
+                .setQueryMasterPort(queryMasterService.bindAddr.getPort())
+                .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
+                .setState(state)
+                .setQueryId(queryId.getProto())
+                .build();
+
+        try {
+          QueryMasterManagerProtocol.QueryHeartbeatResponse response =
+              queryMasterManagerService.queryHeartbeat(null, queryHeartbeat);
+          if(response.getResponseCommand() != null) {
+            if("executeQuery".equals(response.getResponseCommand().getCommand())) {
+              appAttemptId = TajoIdUtils.toApplicationAttemptId(response.getResponseCommand().getParams(0));
+              startQuery(response.getResponseCommand().getParams(1),
+                  response.getResponseCommand().getParams(2));
+            }
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+      LOG.info("QueryMaster heartbeat thread stopped");
+    }
+  }
+
+  // TODO blocking/nonblocking ???
+  class QueryMasterService extends AbstractService implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
+    private ProtoAsyncRpcServer rpcServer;
+    private InetSocketAddress bindAddr;
+    private String addr;
+    private QueryHeartbeatThread queryHeartbeatThread;
+
+    public QueryMasterService() {
+      super(QueryMasterService.class.getName());
+
+      // Setup RPC server
+      try {
+        InetSocketAddress initIsa =
+                new InetSocketAddress(InetAddress.getLocalHost(), 0);
+        if (initIsa.getAddress() == null) {
+          throw new IllegalArgumentException("Failed resolve of " + initIsa);
+        }
+
+        this.rpcServer = new ProtoAsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
+        this.rpcServer.start();
+
+        this.bindAddr = rpcServer.getBindAddress();
+        this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+      // Get the master address
+      LOG.info(QueryMasterService.class.getSimpleName() + " is bind to " + addr);
+      queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+    }
+
+    @Override
+    public void init(Configuration conf) {
+      super.init(conf);
+    }
+
+    @Override
+    public void start() {
+      try {
+        queryHeartbeatThread = new QueryHeartbeatThread();
+        queryHeartbeatThread.start();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        // TODO - set query status failed and stop QueryMaster
+      }
+      super.start();
+    }
+
+    @Override
+    public void stop() {
+      if(rpcServer != null) {
+        rpcServer.shutdown();
+      }
+      if(queryHeartbeatThread != null) {
+        queryHeartbeatThread.interrupt();
+      }
+      if(yarnClient != null) {
+        yarnClient.stop();
+      }
+      if(clientSessionTimeoutCheckThread != null) {
+        clientSessionTimeoutCheckThread.interrupt();
+      }
+      LOG.info("QueryMasterService stopped");
+      super.stop();
+    }
+
+    @Override
+    public void getTask(RpcController controller, YarnProtos.ContainerIdProto request,
+                        RpcCallback<QueryMasterProtocol.QueryUnitRequestProto> done) {
+      queryContext.getEventHandler().handle(new TaskRequestEvent(new ContainerIdPBImpl(request), done));
+    }
+
+    @Override
+    public void statusUpdate(RpcController controller, QueryMasterProtocol.TaskStatusProto request,
+                             RpcCallback<PrimitiveProtos.BoolProto> done) {
+      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+      queryContext.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId, request));
+      done.run(TRUE_PROTO);
+    }
+
+    @Override
+    public void ping(RpcController controller,
+                     TajoIdProtos.QueryUnitAttemptIdProto attemptIdProto,
+                     RpcCallback<PrimitiveProtos.BoolProto> done) {
+      // TODO - to be completed
+      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
+      done.run(TRUE_PROTO);
+    }
+
+    @Override
+    public void fatalError(RpcController controller, QueryMasterProtocol.TaskFatalErrorReport report,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+      queryContext.getEventHandler().handle(new TaskFatalErrorEvent(report));
+      done.run(TRUE_PROTO);
+    }
+
+    @Override
+    public void done(RpcController controller, QueryMasterProtocol.TaskCompletionReport report,
+                     RpcCallback<PrimitiveProtos.BoolProto> done) {
+      queryContext.getEventHandler().handle(new TaskCompletionEvent(report));
+      done.run(TRUE_PROTO);
+    }
+
+    @Override
+    public void executeQuery(RpcController controller, PrimitiveProtos.StringProto request,
+                             RpcCallback<PrimitiveProtos.BoolProto> done) {
+    }
+  }
+
+  public void start() {
+    super.start();
+  }
+
+  public void stop() {
+    LOG.info("unregisterApplicationMaster");
+    if(rmAllocator != null) {
+      try {
+        FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
+        if (query != null) {
+          TajoProtos.QueryState state = query.getState();
+          if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+            status = FinalApplicationStatus.SUCCEEDED;
+          } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
+            status = FinalApplicationStatus.FAILED;
+          } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
+            status = FinalApplicationStatus.FAILED;
+          }
+        }
+        this.rmAllocator.unregisterApplicationMaster(status, "tajo query finished", null);
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+
+    // TODO - release opened resource
+    if(this.queryMasterManagerClient != null) {
+      reportQueryStatus();
+
+      queryMasterManagerClient.close();
+    }
+
+    try {
+      FileSystem.closeAll();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    super.stop();
+
+    synchronized(queryId) {
+      queryId.notifyAll();
+    }
+  }
+
+  private void reportQueryStatus() {
+    //send query status heartbeat
+    QueryMasterManagerProtocol.QueryHeartbeat.Builder queryHeartbeatBuilder =
+        QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
+        .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
+        .setQueryMasterPort(queryMasterService.bindAddr.getPort())
+        .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
+        .setState(state)
+        .setQueryId(queryId.getProto());
+
+    if(statusMessage != null) {
+      queryHeartbeatBuilder.setStatusMessage(statusMessage);
+    }
+    try {
+      queryMasterManagerService.queryHeartbeat(null, queryHeartbeatBuilder.build());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  private void connectYarnClient() {
+    this.yarnClient = new YarnClientImpl();
+    this.yarnClient.init(queryConf);
+    this.yarnClient.start();
+  }
+
+  protected void addIfService(Object object) {
+    if (object instanceof Service) {
+      addService((Service) object);
+    }
+  }
+
+  public synchronized void startQuery(String queryStr, String planJSON) {
+    LOG.info("Query Start:" + queryStr);
+    LOG.info("Plan JSON:" + planJSON);
+    if(query != null) {
+      LOG.warn("Query already started");
+      return;
+    }
+
+    try {
+      LogicalRootNode logicalNodeRoot = (LogicalRootNode) CoreGsonHelper.fromJson(planJSON, LogicalNode.class);
+      LogicalNode[] scanNodes = PlannerUtil.findAllNodes(logicalNodeRoot, ExprType.SCAN);
+      if(scanNodes != null) {
+        for(LogicalNode eachScanNode: scanNodes) {
+          ScanNode scanNode = (ScanNode)eachScanNode;
+          tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
+        }
+      }
+      MasterPlan globalPlan = globalPlanner.build(queryId, logicalNodeRoot);
+      this.masterPlan = globalOptimizer.optimize(globalPlan);
+
+      taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
+      addIfService(taskRunnerLauncher);
+      dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+
+      ((TaskRunnerLauncherImpl)taskRunnerLauncher).init(queryConf);
+      ((TaskRunnerLauncherImpl)taskRunnerLauncher).start();
+
+      rmAllocator = new RMContainerAllocator(queryContext);
+      addIfService(rmAllocator);
+      dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
+
+      rmAllocator.init(queryConf);
+      rmAllocator.start();
+
+      //TODO - synchronized with executeQuery logic
+      query = new Query(queryContext, queryId, clock, appSubmitTime,
+              "", dispatcher.getEventHandler(), masterPlan, storageManager);
+      dispatcher.register(QueryEventType.class, query);
+
+      dispatcher.getEventHandler().handle(new QueryEvent(queryId,
+          QueryEventType.INIT));
+      dispatcher.getEventHandler().handle(new QueryEvent(queryId,
+          QueryEventType.START));
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      //send FAIL query status
+      this.statusMessage = StringUtils.stringifyException(e);
+      this.state = TajoProtos.QueryState.QUERY_FAILED;
+    }
+  }
+
+  @Override
+  public void handle(Event event) {
+    dispatcher.getEventHandler().handle(event);
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
+    public void handle(SubQueryEvent event) {
+      SubQueryId id = event.getSubQueryId();
+      query.getSubQuery(id).handle(event);
+    }
+  }
+
+  private class TaskEventDispatcher
+      implements EventHandler<TaskEvent> {
+    public void handle(TaskEvent event) {
+      QueryUnitId taskId = event.getTaskId();
+      QueryUnit task = query.getSubQuery(taskId.getSubQueryId()).
+          getQueryUnit(taskId);
+      task.handle(event);
+    }
+  }
+
+  private class TaskAttemptEventDispatcher
+      implements EventHandler<TaskAttemptEvent> {
+    public void handle(TaskAttemptEvent event) {
+      QueryUnitAttemptId attemptId = event.getTaskAttemptId();
+      SubQuery subQuery = query.getSubQuery(attemptId.getSubQueryId());
+      QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+      QueryUnitAttempt attempt = task.getAttempt(attemptId);
+      attempt.handle(event);
+    }
+  }
+
+  private class TaskSchedulerDispatcher
+      implements EventHandler<TaskSchedulerEvent> {
+    public void handle(TaskSchedulerEvent event) {
+      SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
+      subQuery.getTaskScheduler().handle(event);
+    }
+  }
+
+  public QueryContext getContext() {
+    return this.queryContext;
+  }
+
+  public class QueryContext {
+    private QueryConf conf;
+    public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
+    int minCapability;
+    int maxCapability;
+    int numCluster;
+    AtomicLong lastClientHeartbeat = new AtomicLong(-1);
+
+    public QueryContext(QueryConf conf) {
+      this.conf = conf;
+    }
+
+    public QueryConf getConf() {
+      return conf;
+    }
+
+    public InetSocketAddress getQueryMasterServiceAddress() {
+      return queryMasterService.bindAddr;
+    }
+
+    public QueryMasterClientService getQueryMasterClientService() {
+      return queryMasterClientService;
+    }
+
+    public AsyncDispatcher getDispatcher() {
+      return dispatcher;
+    }
+
+    public Clock getClock() {
+      return clock;
+    }
+
+    public Query getQuery() {
+      return query;
+    }
+
+    public SubQuery getSubQuery(SubQueryId subQueryId) {
+      return query.getSubQuery(subQueryId);
+    }
+
+    public QueryId getQueryId() {
+      return queryId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return appAttemptId;
+    }
+
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    public void addContainer(ContainerId cId, ContainerProxy container) {
+      containers.put(cId, container);
+    }
+
+    public void removeContainer(ContainerId cId) {
+      containers.remove(cId);
+    }
+
+    public boolean containsContainer(ContainerId cId) {
+      return containers.containsKey(cId);
+    }
+
+    public ContainerProxy getContainer(ContainerId cId) {
+      return containers.get(cId);
+    }
+
+    public Map<ContainerId, ContainerProxy> getContainers() {
+      return containers;
+    }
+
+    public int getNumClusterNode() {
+      return numCluster;
+    }
+
+    public void setNumClusterNodes(int num) {
+      numCluster = num;
+    }
+
+//    public CatalogService getCatalog() {
+//      return catalog;
+//    }
+
+    public Map<String, TableDesc> getTableDescMap() {
+      return tableDescMap;
+    }
+
+    public Path getOutputPath() {
+      return outputPath;
+    }
+
+    public void setMaxContainerCapability(int capability) {
+      this.maxCapability = capability;
+    }
+
+    public int getMaxContainerCapability() {
+      return this.maxCapability;
+    }
+
+    public void setMinContainerCapability(int capability) {
+      this.minCapability = capability;
+    }
+
+    public int getMinContainerCapability() {
+      return this.minCapability;
+    }
+
+    public boolean isCreateTableQuery() {
+      return isCreateTableStmt;
+    }
+
+    public float getProgress() {
+      if(query != null) {
+        return query.getProgress();
+      } else {
+        return 0;
+      }
+    }
+
+    public long getStartTime() {
+      if(query != null) {
+        return query.getStartTime();
+      } else {
+        return -1;
+      }
+    }
+
+    public long getFinishTime() {
+      if(query != null) {
+        return query.getFinishTime();
+      } else {
+        return -1;
+      }
+    }
+
+    public StorageManager getStorageManager() {
+      return storageManager;
+    }
+
+    public QueryMaster getQueryMaster() {
+      return QueryMaster.this;
+    }
+
+    public YarnRPC getYarnRPC() {
+      return yarnRPC;
+    }
+
+    public void setState(TajoProtos.QueryState state) {
+      QueryMaster.this.state = state;
+    }
+
+    public TajoProtos.QueryState getState() {
+      return state;
+    }
+
+    public void touchSessionTime() {
+      this.lastClientHeartbeat.set(System.currentTimeMillis());
+    }
+
+    public long getLastClientHeartbeat() {
+      return this.lastClientHeartbeat.get();
+    }
+  }
+
+  private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+    @Override
+    public void handle(QueryFinishEvent event) {
+      LOG.info("Query end notification started for QueryId : " + query.getId() + "," + query.getState());
+
+      //QueryMaster must be lived until client fetching all query result data.
+      try {
+        // Stop all services
+        // This will also send the final report to the ResourceManager
+        //LOG.info("Calling stop for all the services");
+//        stop();
+      } catch (Throwable t) {
+        LOG.warn("Graceful stop failed ", t);
+      }
+
+      //Bring the process down by force.
+      //Not needed after HADOOP-7140
+      //LOG.info("Exiting QueryMaster..GoodBye!");
+    }
+  }
+
+  // query submission directory is private!
+  final public static FsPermission USER_DIR_PERMISSION =
+      FsPermission.createImmutable((short) 0700); // rwx--------
+
+  /**
+   * It initializes the final output and staging directory and sets
+   * them to variables.
+   */
+  private void initStagingDir() throws IOException {
+    QueryConf conf = getContext().getConf();
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    String givenOutputTableName = conf.getOutputTable();
+    Path stagingDir;
+
+    // If final output directory is not given by an user,
+    // we use the query id as a output directory.
+    if (givenOutputTableName.equals("")) {
+      this.isCreateTableStmt = false;
+      FileSystem defaultFS = FileSystem.get(conf);
+
+      Path homeDirectory = defaultFS.getHomeDirectory();
+      if (!defaultFS.exists(homeDirectory)) {
+        defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
+      }
+
+      Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
+
+      if (defaultFS.exists(userQueryDir)) {
+        FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
+        String owner = fsStatus.getOwner();
+
+        if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+          throw new IOException("The ownership on the user's query " +
+              "directory " + userQueryDir + " is not as expected. " +
+              "It is owned by " + owner + ". The directory must " +
+              "be owned by the submitter " + currentUser + " or " +
+              "by " + realUser);
+        }
+
+        if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
+          LOG.info("Permissions on staging directory " + userQueryDir + " are " +
+              "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+              "to correct value " + USER_DIR_PERMISSION);
+          defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
+        }
+      } else {
+        defaultFS.mkdirs(userQueryDir,
+            new FsPermission(USER_DIR_PERMISSION));
+      }
+
+      stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
+
+      if (defaultFS.exists(stagingDir)) {
+        throw new IOException("The staging directory " + stagingDir
+            + "already exists. The directory must be unique to each query");
+      } else {
+        defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+      }
+
+      // Set the query id to the output table name
+      conf.setOutputTable(queryId.toString());
+
+    } else {
+      this.isCreateTableStmt = true;
+      Path warehouseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR),
+          TajoConstants.WAREHOUSE_DIR);
+      stagingDir = new Path(warehouseDir, conf.getOutputTable());
+
+      FileSystem fs = warehouseDir.getFileSystem(conf);
+      if (fs.exists(stagingDir)) {
+        throw new IOException("The staging directory " + stagingDir
+            + " already exists. The directory must be unique to each query");
+      } else {
+        // TODO - should have appropriate permission
+        fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+      }
+    }
+
+    conf.setOutputPath(stagingDir);
+    outputPath = stagingDir;
+    LOG.info("Initialized Query Staging Dir: " + outputPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
new file mode 100644
index 0000000..1a326fe
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+public class QueryMasterClientService extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(QueryMasterClientService.class);
+  private final PrimitiveProtos.BoolProto BOOL_TRUE =
+          PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+
+  private ProtoBlockingRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private QueryMaster.QueryContext queryContext;
+  private QueryMasterClientProtocolServiceHandler serviceHandler;
+
+  public QueryMasterClientService(QueryMaster.QueryContext queryContext) {
+    super(QueryMasterClientService.class.getName());
+
+    this.queryContext = queryContext;
+    this.serviceHandler = new QueryMasterClientProtocolServiceHandler();
+
+    // init RPC Server in constructor cause Heartbeat Thread use bindAddr
+    // Setup RPC server
+    try {
+      // TODO initial port num is value of config and find unused port with sequence
+      InetSocketAddress initIsa = new InetSocketAddress(InetAddress.getLocalHost(), 0);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      // TODO blocking/non-blocking??
+      this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
+      this.rpcServer.start();
+
+      this.bindAddr = rpcServer.getBindAddress();
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info(QueryMasterClientService.class.getSimpleName() + " is bind to " + addr);
+    //queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("QueryMasterClientService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+
+  public class QueryMasterClientProtocolServiceHandler
+          implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
+    @Override
+    public PrimitiveProtos.BoolProto updateSessionVariables(
+            RpcController controller,
+            ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
+      return null;
+    }
+
+    @Override
+    public ClientProtos.GetQueryResultResponse getQueryResult(
+            RpcController controller,
+            ClientProtos.GetQueryResultRequest request) throws ServiceException {
+      QueryId queryId = new QueryId(request.getQueryId());
+      Query query = queryContext.getQuery();
+
+      ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
+
+      if(query == null) {
+        builder.setErrorMessage("No Query for " + queryId);
+      } else {
+        switch (query.getState()) {
+          case QUERY_SUCCEEDED:
+            builder.setTableDesc((CatalogProtos.TableDescProto)query.getResultDesc().getProto());
+            break;
+          case QUERY_FAILED:
+          case QUERY_ERROR:
+            builder.setErrorMessage("Query " + queryId + " is failed");
+          default:
+            builder.setErrorMessage("Query " + queryId + " is still running");
+        }
+      }
+      return builder.build();
+    }
+
+    @Override
+    public ClientProtos.GetQueryStatusResponse getQueryStatus(
+            RpcController controller,
+            ClientProtos.GetQueryStatusRequest request) throws ServiceException {
+      ClientProtos.GetQueryStatusResponse.Builder builder
+              = ClientProtos.GetQueryStatusResponse.newBuilder();
+      QueryId queryId = new QueryId(request.getQueryId());
+      builder.setQueryId(request.getQueryId());
+
+      if (queryId.equals(TajoIdUtils.NullQueryId)) {
+        builder.setResultCode(ClientProtos.ResultCode.OK);
+        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+      } else {
+        Query query = queryContext.getQuery();
+        builder.setResultCode(ClientProtos.ResultCode.OK);
+        builder.setQueryMasterHost(queryContext.getQueryMasterClientService().getBindAddr().getHostName());
+        builder.setQueryMasterPort(queryContext.getQueryMasterClientService().getBindAddr().getPort());
+
+
+        queryContext.touchSessionTime();
+        if (query != null) {
+          builder.setState(query.getState());
+          builder.setProgress(query.getProgress());
+          builder.setSubmitTime(query.getAppSubmitTime());
+          builder.setInitTime(query.getInitializationTime());
+          builder.setHasResult(!query.isCreateTableStmt());
+          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+            builder.setFinishTime(query.getFinishTime());
+          } else {
+            builder.setFinishTime(System.currentTimeMillis());
+          }
+        } else {
+          builder.setState(queryContext.getState());
+        }
+      }
+
+      return builder.build();
+    }
+
+    @Override
+    public PrimitiveProtos.BoolProto killQuery(
+            RpcController controller,
+            YarnProtos.ApplicationAttemptIdProto request) throws ServiceException {
+      LOG.info("Stop QueryMaster:" + queryContext.getQueryId());
+      Thread t = new Thread() {
+        public void run() {
+          try {
+            Thread.sleep(1000);   //wait tile return to rpc response
+          } catch (InterruptedException e) {
+          }
+          queryContext.getQueryMaster().stop();
+        }
+      };
+      t.start();
+      return BOOL_TRUE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
new file mode 100644
index 0000000..47adf7d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
+import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.TajoMaster;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+// TODO - check QueryMaster status and if QueryMaster failed, release resource
+public class QueryMasterManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryMasterManager.class.getName());
+
+  // Master Context
+  private final TajoMaster.MasterContext masterContext;
+
+  // AppMaster Common
+  private final Clock clock;
+  private final long appSubmitTime;
+  private final ApplicationId appId;
+  private ApplicationAttemptId appAttemptId;
+
+  protected YarnClient yarnClient;
+
+  // For Query
+  private final QueryId queryId;
+
+  private AsyncDispatcher dispatcher;
+  private YarnRPC rpc;
+
+  private TajoProtos.QueryState state;
+  private float progress;
+  private long finishTime;
+  private TableDesc resultDesc;
+  private String queryMasterHost;
+  private int queryMasterPort;
+  private int queryMasterClientPort;
+
+  private LogicalRootNode plan;
+
+  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+  private AtomicBoolean queryMasterStopped = new AtomicBoolean(true);
+
+  private boolean stopCheckThreadStarted = false;
+
+  private String query;
+
+  public QueryMasterManager(final TajoMaster.MasterContext masterContext,
+                     final YarnClient yarnClient,
+                     final QueryId queryId,
+                     final String query,
+                     final LogicalRootNode plan,
+                     final ApplicationId appId,
+                     final Clock clock, long appSubmitTime) {
+    super(QueryMasterManager.class.getName());
+    this.masterContext = masterContext;
+    this.yarnClient = yarnClient;
+
+    this.appId = appId;
+    this.clock = clock;
+    this.appSubmitTime = appSubmitTime;
+    this.queryId = queryId;
+    this.plan = plan;
+    this.query = query;
+    LOG.info("Created Query Master Manager for AppId=" + appId + ", QueryID=" + queryId);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+
+    state = TajoProtos.QueryState.QUERY_MASTER_INIT;
+  }
+
+  public TajoProtos.QueryState getState() {
+    return state;
+  }
+
+  @Override
+  public void start() {
+    try {
+      appAttemptId = allocateAndLaunchQueryMaster();
+    } catch (YarnRemoteException e) {
+      LOG.error(e.getMessage(), e);
+    }
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    while(true) {
+      if(queryMasterStopped.get()) {
+        break;
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+    LOG.info("QueryMasterManager for " + queryId + " stopped");
+    super.stop();
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public long getAppSubmitTime() {
+    return appSubmitTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public TableDesc getResultDesc() {
+    return resultDesc;
+  }
+
+  public String getQueryMasterHost() {
+    return queryMasterHost;
+  }
+
+  public int getQueryMasterPort() {
+    return queryMasterPort;
+  }
+
+  public int getQueryMasterClientPort() {
+    return queryMasterClientPort;
+  }
+
+  public synchronized QueryHeartbeatResponse.ResponseCommand queryHeartbeat(QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat) {
+    this.queryMasterHost = queryHeartbeat.getQueryMasterHost();
+    this.queryMasterPort = queryHeartbeat.getQueryMasterPort();
+    this.queryMasterClientPort = queryHeartbeat.getQueryMasterClientPort();
+    this.state = queryHeartbeat.getState();
+    if(state == TajoProtos.QueryState.QUERY_FAILED) {
+      //TODO needed QueryMaster's detail status(failed before or after launching worker)
+      queryMasterStopped.set(true);
+      if(queryHeartbeat.getStatusMessage() != null) {
+        LOG.warn(queryId + " failed, " + queryHeartbeat.getStatusMessage());
+      }
+    }
+
+    if(!stopCheckThreadStarted && !queryMasterStopped.get() && isFinishState(this.state)) {
+      stopCheckThreadStarted = true;
+      startCheckingQueryMasterStop();
+    }
+    if(appAttemptId != null && !querySubmitted.get()) {
+      LOG.info("submitQuery to QueryMaster(" + queryMasterHost + ":" + queryMasterPort + ")");
+      queryMasterStopped.set(false);
+      querySubmitted.set(true);
+      List<String> params = new ArrayList<String>(3);
+      params.add(appAttemptId.toString());
+      params.add(query);
+      params.add(plan.toJson());
+      return QueryHeartbeatResponse.ResponseCommand.newBuilder()
+          .setCommand("executeQuery")
+          .addAllParams(params)
+          .build();
+    } else {
+      return null;
+    }
+  }
+
+  private boolean isFinishState(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_FAILED ||
+        state == TajoProtos.QueryState.QUERY_KILLED ||
+        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+  }
+
+  private void startCheckingQueryMasterStop() {
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          ApplicationReport report = monitorApplication(appId,
+              EnumSet.of(
+                  YarnApplicationState.FINISHED,
+                  YarnApplicationState.KILLED,
+                  YarnApplicationState.FAILED));
+          queryMasterStopped.set(true);
+          LOG.info("QueryMaster (" + queryId + ") stopped");
+        } catch (YarnRemoteException e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    };
+
+    t.start();
+  }
+
+  private ApplicationAttemptId allocateAndLaunchQueryMaster() throws YarnRemoteException {
+    LOG.info("Allocate and launch QueryMaster:" + yarnClient);
+    ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Tajo");
+
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(5);
+    appContext.setPriority(pri);
+
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+
+    ContainerLaunchContext commonContainerLaunchContext =
+            ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf());
+
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the local resources
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    //LOG.info("Setting up app master command");
+    vargs.add("${JAVA_HOME}" + "/bin/java");
+    // Set Xmx based on am memory size
+    vargs.add("-Xmx2000m");
+    // Set Remote Debugging
+    //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
+    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    //}
+    // Set class name
+    vargs.add(QueryMasterRunner.class.getCanonicalName());
+    vargs.add(queryId.toString()); // queryId
+    vargs.add(String.valueOf(appSubmitTime));
+    vargs.add(masterContext.getQueryMasterManagerService().getBindAddress().getHostName() + ":" +
+            masterContext.getQueryMasterManagerService().getBindAddress().getPort());
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+
+    final Resource resource = Records.newRecord(Resource.class);
+    // TODO - get default value from conf
+    resource.setMemory(2048);
+    resource.setVirtualCores(1);
+
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+
+    ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
+            null, commonContainerLaunchContext.getUser(),
+            resource, commonContainerLaunchContext.getLocalResources(), myEnv, commands,
+            myServiceData, null, new HashMap<ApplicationAccessType, String>(2));
+
+    appContext.setAMContainerSpec(masterContainerContext);
+
+    LOG.info("Submitting QueryMaster to ResourceManager");
+    yarnClient.submitApplication(appContext);
+
+    ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
+    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+
+    LOG.info("Launching QueryMaster with id: " + attemptId);
+
+    state = TajoProtos.QueryState.QUERY_MASTER_LAUNCHED;
+
+    return attemptId;
+  }
+
+  private ApplicationReport monitorApplication(ApplicationId appId,
+                                               Set<YarnApplicationState> finalState) throws YarnRemoteException {
+
+    long sleepTime = 100;
+    int count = 1;
+    while (true) {
+      // Get application report for the appId we are interested in
+      ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+      LOG.info("Got application report from ASM for" + ", appId="
+              + appId.getId() + ", appAttemptId="
+              + report.getCurrentApplicationAttemptId() + ", clientToken="
+              + report.getClientToken() + ", appDiagnostics="
+              + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+              + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+              + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+              + ", yarnAppState=" + report.getYarnApplicationState().toString()
+              + ", distributedFinalState="
+              + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+              + report.getTrackingUrl() + ", appUser=" + report.getUser());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      if (finalState.contains(state)) {
+        return report;
+      }
+      try {
+        Thread.sleep(sleepTime);
+        sleepTime = count * 100;
+        if(count < 10) {
+          count++;
+        }
+      } catch (InterruptedException e) {
+        //LOG.debug("Thread sleep in monitoring loop interrupted");
+      }
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
new file mode 100644
index 0000000..a3c7b75
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeat;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+
+import java.net.InetSocketAddress;
+
+public class QueryMasterManagerService extends AbstractService {
+  private final static Log LOG = LogFactory.getLog(QueryMasterManagerService.class);
+
+  private final TajoMaster.MasterContext context;
+  private final TajoConf conf;
+  private final QueryMasterManagerProtocolServiceHandler masterHandler;
+  private ProtoBlockingRpcServer server;
+  private InetSocketAddress bindAddress;
+
+  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+  public QueryMasterManagerService(TajoMaster.MasterContext context) {
+    super(QueryMasterManagerService.class.getName());
+    this.context = context;
+    this.conf = context.getConf();
+    this.masterHandler = new QueryMasterManagerProtocolServiceHandler();
+  }
+
+  @Override
+  public void start() {
+    // TODO resolve hostname
+    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+    try {
+      server = new ProtoBlockingRpcServer(QueryMasterManagerProtocol.class, masterHandler, initIsa);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    server.start();
+    bindAddress = server.getBindAddress();
+    this.conf.setVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS,
+            org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
+    LOG.info("Instantiated QueryMasterManagerService at " + this.bindAddress);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  public class QueryMasterManagerProtocolServiceHandler implements QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface {
+    @Override
+    public QueryHeartbeatResponse queryHeartbeat(RpcController controller, QueryHeartbeat request) throws ServiceException {
+      // TODO - separate QueryMasterManagerProtocol, ClientServiceProtocol
+      QueryId queryId = new QueryId(request.getQueryId());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Received QueryHeartbeat:" + queryId + "," + request);
+      }
+      QueryMasterManager queryMasterManager = context.getQuery(queryId);
+      if (queryMasterManager == null) {
+        LOG.warn("No query:" + queryId);
+        return QueryHeartbeatResponse.newBuilder().setHeartbeatResult(BOOL_FALSE).build();
+      }
+
+      QueryHeartbeatResponse.ResponseCommand command = queryMasterManager.queryHeartbeat(request);
+
+      //ApplicationAttemptId attemptId = queryMasterManager.getAppAttemptId();
+      //String attemptIdStr = attemptId == null ? null : attemptId.toString();
+      QueryHeartbeatResponse.Builder builder = QueryHeartbeatResponse.newBuilder();
+      builder.setHeartbeatResult(BOOL_TRUE);
+      if(command != null) {
+        builder.setResponseCommand(command);
+      }
+      return builder.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
new file mode 100644
index 0000000..f34464b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+public class QueryMasterRunner extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
+  private QueryConf queryConf;
+  private QueryMaster queryMaster;
+  private QueryId queryId;
+  private long appSubmitTime;
+  private String queryMasterManagerAddress;
+
+  public QueryMasterRunner(QueryId queryId, long appSubmitTime, String queryMasterManagerAddress) {
+    super(QueryMasterRunner.class.getName());
+    this.queryId = queryId;
+    this.appSubmitTime = appSubmitTime;
+    this.queryMasterManagerAddress = queryMasterManagerAddress;
+  }
+
+  private class ShutdownHook implements Runnable {
+    @Override
+    public void run() {
+      LOG.info("============================================");
+      LOG.info("QueryMaster received SIGINT Signal");
+      LOG.info("============================================");
+      stop();
+    }
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.queryConf = (QueryConf)conf;
+    RackResolver.init(queryConf);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    //create QueryMaster
+    QueryMaster query = new QueryMaster(queryId, appSubmitTime, queryMasterManagerAddress);
+
+    query.init(queryConf);
+    query.start();
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  public static void main(String[] args) throws Exception {
+    LOG.info("QueryMasterRunner started");
+
+    final QueryConf conf = new QueryConf();
+    conf.addResource(new Path(QueryConf.FILENAME));
+
+    UserGroupInformation.setConfiguration(conf);
+
+    final QueryId queryId = TajoIdUtils.createQueryId(args[0]);
+    final long appSubmitTime = Long.parseLong(args[1]);
+    final String queryMasterManagerAddr = args[2];
+
+    LOG.info("Received QueryId:" + queryId);
+
+    QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, appSubmitTime, queryMasterManagerAddr);
+    queryMasterRunner.init(conf);
+    queryMasterRunner.start();
+
+    synchronized(queryId) {
+      queryId.wait();
+    }
+
+    System.exit(0);
+  }
+
+  public static void printThreadInfo(PrintWriter stream, String title) {
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+    final int STACK_DEPTH = 60;
+    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+    long[] threadIds = threadBean.getAllThreadIds();
+    stream.println("Process Thread Dump: " + title);
+    stream.println(threadIds.length + " active threads");
+    for (long tid : threadIds) {
+      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+      if (info == null) {
+        stream.println("  Inactive");
+        continue;
+      }
+      stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
+      Thread.State state = info.getThreadState();
+      stream.println("  State: " + state);
+      stream.println("  Blocked count: " + info.getBlockedCount());
+      stream.println("  Waited count: " + info.getWaitedCount());
+      if (contention) {
+        stream.println("  Blocked time: " + info.getBlockedTime());
+        stream.println("  Waited time: " + info.getWaitedTime());
+      }
+      if (state == Thread.State.WAITING) {
+        stream.println("  Waiting on " + info.getLockName());
+      } else if (state == Thread.State.BLOCKED) {
+        stream.println("  Blocked on " + info.getLockName());
+        stream.println("  Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
+      }
+      stream.println("  Stack:");
+      for (StackTraceElement frame : info.getStackTrace()) {
+        stream.println("    " + frame.toString());
+      }
+    }
+    stream.flush();
+  }
+
+  private static String getTaskName(long id, String name) {
+    if (name == null) {
+      return Long.toString(id);
+    }
+    return id + " (" + name + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
new file mode 100644
index 0000000..fcb8f3e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.ipc.QueryMasterProtocol.Partition;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.Fragment;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QueryUnit implements EventHandler<TaskEvent> {
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(QueryUnit.class);
+
+	private QueryUnitId taskId;
+  private EventHandler eventHandler;
+	private StoreTableNode store = null;
+	private LogicalNode plan = null;
+	private List<ScanNode> scan;
+	
+	private Map<String, Fragment> fragMap;
+	private Map<String, Set<URI>> fetchMap;
+	
+  private List<Partition> partitions;
+	private TableStat stats;
+  private String [] dataLocations;
+  private final boolean isLeafTask;
+  private List<IntermediateEntry> intermediateData;
+
+  private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
+  private final int maxAttempts = 3;
+  private Integer lastAttemptId;
+
+  private QueryUnitAttemptId successfulAttempt;
+  private String succeededHost;
+  private int succeededPullServerPort;
+
+  private int failedAttempts;
+  private int finishedAttempts; // finish are total of success, failed and killed
+
+  private static final StateMachineFactory
+      <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
+      new StateMachineFactory
+          <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+      .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+          TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
+
+       .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+           TaskEventType.T_ATTEMPT_LAUNCHED)
+
+        .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+           TaskEventType.T_ATTEMPT_LAUNCHED)
+
+       .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+           TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
+
+       .addTransition(TaskState.RUNNING,
+            EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+            TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
+
+
+
+      .installTopology();
+  private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
+
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+	public QueryUnit(QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
+		this.taskId = id;
+    this.eventHandler = eventHandler;
+    this.isLeafTask = isLeafTask;
+		scan = new ArrayList<ScanNode>();
+    fetchMap = Maps.newHashMap();
+    fragMap = Maps.newHashMap();
+    partitions = new ArrayList<Partition>();
+    attempts = Collections.emptyMap();
+    lastAttemptId = -1;
+    failedAttempts = 0;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+	}
+
+  public boolean isLeafTask() {
+    return this.isLeafTask;
+  }
+
+  public void setDataLocations(String [] dataLocations) {
+    this.dataLocations = dataLocations;
+  }
+
+  public String [] getDataLocations() {
+    return this.dataLocations;
+  }
+
+  public TaskState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+	
+	public void setLogicalPlan(LogicalNode plan) {
+    Preconditions.checkArgument(plan.getType() == ExprType.STORE ||
+        plan.getType() == ExprType.CREATE_INDEX);
+    
+	  this.plan = plan;
+    store = (StoreTableNode) plan;
+
+	  LogicalNode node = plan;
+	  ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+	  s.add(node);
+	  while (!s.isEmpty()) {
+	    node = s.remove(s.size()-1);
+	    if (node instanceof UnaryNode) {
+	      UnaryNode unary = (UnaryNode) node;
+	      s.add(s.size(), unary.getSubNode());
+	    } else if (node instanceof BinaryNode) {
+	      BinaryNode binary = (BinaryNode) node;
+	      s.add(s.size(), binary.getOuterNode());
+	      s.add(s.size(), binary.getInnerNode());
+	    } else if (node instanceof ScanNode) {
+	      scan.add((ScanNode)node);
+	    }
+	  }
+	}
+
+  @Deprecated
+  public void setFragment(String tableId, Fragment fragment) {
+    this.fragMap.put(tableId, fragment);
+    if (fragment.hasDataLocations()) {
+      setDataLocations(fragment.getDataLocations());
+    }
+  }
+
+  public void setFragment2(Fragment fragment) {
+    this.fragMap.put(fragment.getId(), fragment);
+    if (fragment.hasDataLocations()) {
+      setDataLocations(fragment.getDataLocations());
+    }
+  }
+	
+	public void addFetch(String tableId, String uri) throws URISyntaxException {
+	  this.addFetch(tableId, new URI(uri));
+	}
+	
+	public void addFetch(String tableId, URI uri) {
+	  Set<URI> uris;
+	  if (fetchMap.containsKey(tableId)) {
+	    uris = fetchMap.get(tableId);
+	  } else {
+	    uris = Sets.newHashSet();
+	  }
+	  uris.add(uri);
+    fetchMap.put(tableId, uris);
+	}
+	
+	public void addFetches(String tableId, Collection<URI> urilist) {
+	  Set<URI> uris;
+    if (fetchMap.containsKey(tableId)) {
+      uris = fetchMap.get(tableId);
+    } else {
+      uris = Sets.newHashSet();
+    }
+    uris.addAll(urilist);
+    fetchMap.put(tableId, uris);
+	}
+	
+	public void setFetches(Map<String, Set<URI>> fetches) {
+	  this.fetchMap.clear();
+	  this.fetchMap.putAll(fetches);
+	}
+	
+  public Fragment getFragment(String tableId) {
+    return this.fragMap.get(tableId);
+  }
+
+  public Collection<Fragment> getAllFragments() {
+    return fragMap.values();
+  }
+	
+	public LogicalNode getLogicalPlan() {
+	  return this.plan;
+	}
+	
+	public QueryUnitId getId() {
+		return taskId;
+	}
+	
+	public Collection<URI> getFetchHosts(String tableId) {
+	  return fetchMap.get(tableId);
+	}
+	
+	public Collection<Set<URI>> getFetches() {
+	  return fetchMap.values();
+	}
+	
+	public Collection<URI> getFetch(ScanNode scan) {
+	  return this.fetchMap.get(scan.getTableId());
+	}
+
+	public String getOutputName() {
+		return this.store.getTableName();
+	}
+	
+	public Schema getOutputSchema() {
+	  return this.store.getOutSchema();
+	}
+	
+	public StoreTableNode getStoreTableNode() {
+	  return this.store;
+	}
+	
+	public ScanNode[] getScanNodes() {
+	  return this.scan.toArray(new ScanNode[scan.size()]);
+	}
+	
+	@Override
+	public String toString() {
+		String str = new String(plan.getType() + " \n");
+		for (Entry<String, Fragment> e : fragMap.entrySet()) {
+		  str += e.getKey() + " : ";
+      str += e.getValue() + " ";
+		}
+		for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
+      str += e.getKey() + " : ";
+      for (URI t : e.getValue()) {
+        str += t + " ";
+      }
+    }
+		
+		return str;
+	}
+	
+	public void setStats(TableStat stats) {
+	  this.stats = stats;
+	}
+	
+	public void setPartitions(List<Partition> partitions) {
+	  this.partitions = Collections.unmodifiableList(partitions);
+	}
+	
+	public TableStat getStats() {
+	  return this.stats;
+	}
+	
+	public List<Partition> getPartitions() {
+	  return this.partitions;
+	}
+	
+	public int getPartitionNum() {
+	  return this.partitions.size();
+	}
+
+  public QueryUnitAttempt newAttempt() {
+    QueryUnitAttempt attempt = new QueryUnitAttempt(
+        QueryIdFactory.newQueryUnitAttemptId(this.getId(),
+            ++lastAttemptId), this, eventHandler);
+    return attempt;
+  }
+
+  public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
+    return attempts.get(attemptId);
+  }
+
+  public QueryUnitAttempt getAttempt(int attempt) {
+    return this.attempts.get(new QueryUnitAttemptId(this.getId(), attempt));
+  }
+
+  public QueryUnitAttempt getLastAttempt() {
+    return this.attempts.get(this.lastAttemptId);
+  }
+
+  protected QueryUnitAttempt getSuccessfulAttempt() {
+    readLock.lock();
+    try {
+      if (null == successfulAttempt) {
+        return null;
+      }
+      return attempts.get(successfulAttempt);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public int getRetryCount () {
+    return this.lastAttemptId;
+  }
+
+  private static class InitialScheduleTransition implements
+    SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.addAndScheduleAttempt();
+    }
+  }
+
+  // This is always called in the Write Lock
+  private void addAndScheduleAttempt() {
+    // Create new task attempt
+    QueryUnitAttempt attempt = newAttempt();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created attempt " + attempt.getId());
+    }
+    switch (attempts.size()) {
+      case 0:
+        attempts = Collections.singletonMap(attempt.getId(), attempt);
+        break;
+
+      case 1:
+        Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
+            = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
+        newAttempts.putAll(attempts);
+        attempts = newAttempts;
+        attempts.put(attempt.getId(), attempt);
+        break;
+
+      default:
+        attempts.put(attempt.getId(), attempt);
+        break;
+    }
+
+    if (failedAttempts > 0) {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+          TaskAttemptEventType.TA_RESCHEDULE));
+    } else {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+  }
+
+  private static class AttemptSucceededTransition
+      implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+    @Override
+    public void transition(QueryUnit task,
+                           TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      QueryUnitAttempt attempt = task.attempts.get(
+          attemptEvent.getTaskAttemptId());
+      task.successfulAttempt = attemptEvent.getTaskAttemptId();
+      task.succeededHost = attempt.getHost();
+      task.succeededPullServerPort = attempt.getPullServerPort();
+      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
+          SubQueryEventType.SQ_TASK_COMPLETED));
+    }
+  }
+
+  private static class AttemptFailedTransition implements
+    MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
+
+    @Override
+    public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
+      LOG.info("=============================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+      LOG.info("=============================================================");
+      task.failedAttempts++;
+      task.finishedAttempts++;
+
+      if (task.failedAttempts < task.maxAttempts) {
+        if (task.successfulAttempt == null) {
+          task.addAndScheduleAttempt();
+        }
+      } else {
+        task.eventHandler.handle(
+            new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
+        return TaskState.FAILED;
+      }
+
+      return task.getState();
+    }
+  }
+
+  @Override
+  public void handle(TaskEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskId() + " of type "
+          + event.getType());
+    }
+
+    try {
+      writeLock.lock();
+      TaskState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new QueryEvent(getId().getQueryId(),
+            QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getState()) {
+          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void setIntermediateData(Collection<IntermediateEntry> partitions) {
+    this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
+  }
+
+  public List<IntermediateEntry> getIntermediateData() {
+    return this.intermediateData;
+  }
+
+  public static class IntermediateEntry {
+    int taskId;
+    int attemptId;
+    int partitionId;
+    String pullHost;
+    int port;
+
+    public IntermediateEntry(int taskId, int attemptId, int partitionId,
+                             String pullServerAddr, int pullServerPort) {
+      this.taskId = taskId;
+      this.attemptId = attemptId;
+      this.partitionId = partitionId;
+      this.pullHost = pullServerAddr;
+      this.port = pullServerPort;
+    }
+
+    public int getTaskId() {
+      return this.taskId;
+    }
+
+    public int getAttemptId() {
+      return this.attemptId;
+    }
+
+    public int getPartitionId() {
+      return this.partitionId;
+    }
+
+    public String getPullHost() {
+      return this.pullHost;
+    }
+
+    public int getPullPort() {
+      return port;
+    }
+
+    public String getPullAddress() {
+      return pullHost + ":" + port;
+    }
+  }
+}