You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:48 UTC

[26/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
new file mode 100644
index 0000000..c968a73
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -0,0 +1,754 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobEvent;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.session.InvalidSessionException;
+import org.apache.tajo.master.session.NoSuchSessionVariableException;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.rpc.BlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
+public class TajoMasterClientService extends AbstractService {
+  private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class);
+  private final MasterContext context;
+  private final TajoConf conf;
+  private final CatalogService catalog;
+  private final TajoMasterClientProtocolServiceHandler clientHandler;
+  private BlockingRpcServer server;
+  private InetSocketAddress bindAddress;
+
+  private final BoolProto BOOL_TRUE =
+      BoolProto.newBuilder().setValue(true).build();
+  private final BoolProto BOOL_FALSE =
+      BoolProto.newBuilder().setValue(false).build();
+
+  public TajoMasterClientService(MasterContext context) {
+    super(TajoMasterClientService.class.getName());
+    this.context = context;
+    this.conf = context.getConf();
+    this.catalog = context.getCatalog();
+    this.clientHandler = new TajoMasterClientProtocolServiceHandler();
+  }
+
+  @Override
+  public void start() {
+
+    // start the rpc server
+    String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
+    int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+    try {
+      server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum);
+    } catch (Exception e) {
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+    server.start();
+
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+    LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if (server != null) {
+      server.shutdown();
+    }
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddress;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // TajoMasterClientProtocolService
+  /////////////////////////////////////////////////////////////////////////////
+  public class TajoMasterClientProtocolServiceHandler implements TajoMasterClientProtocolService.BlockingInterface {
+    @Override
+    public CreateSessionResponse createSession(RpcController controller, CreateSessionRequest request)
+        throws ServiceException {
+      try {
+        // create a new session with base database name. If no database name is give, we use default database.
+        String databaseName = request.hasBaseDatabaseName() ? request.getBaseDatabaseName() : DEFAULT_DATABASE_NAME;
+
+        if (!context.getCatalog().existDatabase(databaseName)) {
+          LOG.info("Session creation is canceled due to absent base database \"" + databaseName + "\".");
+          throw new NoSuchDatabaseException(databaseName);
+        }
+
+        String sessionId =
+            context.getSessionManager().createSession(request.getUsername(), databaseName);
+        CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+        builder.setState(CreateSessionResponse.ResultState.SUCCESS);
+        builder.setSessionId(TajoIdProtos.SessionIdProto.newBuilder().setId(sessionId).build());
+        return builder.build();
+      } catch (NoSuchDatabaseException nsde) {
+        CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+        builder.setState(CreateSessionResponse.ResultState.FAILED);
+        builder.setMessage(nsde.getMessage());
+        return builder.build();
+      } catch (InvalidSessionException e) {
+        CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+        builder.setState(CreateSessionResponse.ResultState.FAILED);
+        builder.setMessage(e.getMessage());
+        return builder.build();
+      }
+    }
+
+    @Override
+    public BoolProto removeSession(RpcController controller, TajoIdProtos.SessionIdProto request)
+        throws ServiceException {
+      if (request != null) {
+        context.getSessionManager().removeSession(request.getId());
+      }
+      return ProtoUtil.TRUE;
+    }
+
+    @Override
+    public BoolProto updateSessionVariables(RpcController controller, UpdateSessionVariableRequest request)
+        throws ServiceException {
+      try {
+        String sessionId = request.getSessionId().getId();
+        for (CatalogProtos.KeyValueProto kv : request.getSetVariables().getKeyvalList()) {
+          context.getSessionManager().setVariable(sessionId, kv.getKey(), kv.getValue());
+        }
+        for (String unsetVariable : request.getUnsetVariablesList()) {
+          context.getSessionManager().removeVariable(sessionId, unsetVariable);
+        }
+        return ProtoUtil.TRUE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public StringProto getSessionVariable(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+
+      try {
+        return ProtoUtil.convertString(
+            context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue()));
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue());
+        if (value != null) {
+          return ProtoUtil.TRUE;
+        } else {
+          return ProtoUtil.FALSE;
+        }
+      } catch (NoSuchSessionVariableException nssv) {
+        return ProtoUtil.FALSE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public CatalogProtos.KeyValueSetProto getAllSessionVariables(RpcController controller,
+                                                                 TajoIdProtos.SessionIdProto request)
+        throws ServiceException {
+      try {
+        String sessionId = request.getId();
+        Options options = new Options();
+        options.putAll(context.getSessionManager().getAllVariables(sessionId));
+        return options.getProto();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public StringProto getCurrentDatabase(RpcController controller, TajoIdProtos.SessionIdProto request)
+        throws ServiceException {
+      try {
+        String sessionId = request.getId();
+        return ProtoUtil.convertString(context.getSessionManager().getSession(sessionId).getCurrentDatabase());
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto selectDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        String sessionId = request.getSessionId().getId();
+        String databaseName = request.getValue();
+
+        if (context.getCatalog().existDatabase(databaseName)) {
+          context.getSessionManager().getSession(sessionId).selectDatabase(databaseName);
+          return ProtoUtil.TRUE;
+        } else {
+          throw new ServiceException(new NoSuchDatabaseException(databaseName));
+        }
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public SubmitQueryResponse submitQuery(RpcController controller, QueryRequest request) throws ServiceException {
+
+
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Query [" + request.getQuery() + "] is submitted");
+        }
+        return context.getGlobalEngine().executeQuery(session, request.getQuery());
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        SubmitQueryResponse.Builder responseBuilder = ClientProtos.SubmitQueryResponse.newBuilder();
+        responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME));
+        responseBuilder.setResultCode(ResultCode.ERROR);
+        if (e.getMessage() != null) {
+          responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+        } else {
+          responseBuilder.setErrorMessage("Internal Error");
+        }
+        return responseBuilder.build();
+      }
+    }
+
+    @Override
+    public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest request) throws ServiceException {
+
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
+        try {
+          context.getGlobalEngine().updateQuery(session, request.getQuery());
+          builder.setResultCode(ResultCode.OK);
+          return builder.build();
+        } catch (Exception e) {
+          builder.setResultCode(ResultCode.ERROR);
+          if (e.getMessage() == null) {
+            builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+          }
+          return builder.build();
+        }
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetQueryResultResponse getQueryResult(RpcController controller,
+                                                 GetQueryResultRequest request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        QueryId queryId = new QueryId(request.getQueryId());
+        QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+
+        // if we cannot get a QueryInProgress instance from QueryJobManager,
+        // the instance can be in the finished query list.
+        if (queryInProgress == null) {
+          queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
+        }
+
+        GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder();
+
+        // If we cannot the QueryInProgress instance from the finished list,
+        // the query result was expired due to timeout.
+        // In this case, we will result in error.
+        if (queryInProgress == null) {
+          builder.setErrorMessage("No such query: " + queryId.toString());
+          return builder.build();
+        }
+
+        QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+        try {
+          //TODO After implementation Tajo's user security feature, Should be modified.
+          builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
+        } catch (IOException e) {
+          LOG.warn("Can't get current user name");
+        }
+        switch (queryInfo.getQueryState()) {
+          case QUERY_SUCCEEDED:
+            // TODO check this logic needed
+            //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
+            break;
+          case QUERY_FAILED:
+          case QUERY_ERROR:
+            builder.setErrorMessage("Query " + queryId + " is failed");
+          default:
+            builder.setErrorMessage("Query " + queryId + " is still running");
+        }
+
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetQueryListResponse getRunningQueryList(RpcController controller, GetQueryListRequest request)
+
+        throws ServiceException {
+
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();
+
+        Collection<QueryInProgress> queries
+          = context.getQueryJobManager().getRunningQueries();
+
+        BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+
+        for (QueryInProgress queryInProgress : queries) {
+          QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+          infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+          infoBuilder.setState(queryInfo.getQueryState());
+          infoBuilder.setQuery(queryInfo.getSql());
+          infoBuilder.setStartTime(queryInfo.getStartTime());
+          long endTime = (queryInfo.getFinishTime() == 0) ?
+                         System.currentTimeMillis() : queryInfo.getFinishTime();
+          infoBuilder.setFinishTime(endTime);
+          infoBuilder.setProgress(queryInfo.getProgress());
+          infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+          infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+          builder.addQueryList(infoBuilder.build());
+        }
+
+        GetQueryListResponse result = builder.build();
+        return result;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetQueryListResponse getFinishedQueryList(RpcController controller, GetQueryListRequest request)
+        throws ServiceException {
+
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder();
+
+        Collection<QueryInProgress> queries
+            = context.getQueryJobManager().getFinishedQueries();
+
+        BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+
+        for (QueryInProgress queryInProgress : queries) {
+          QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+          infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+          infoBuilder.setState(queryInfo.getQueryState());
+          infoBuilder.setQuery(queryInfo.getSql());
+          infoBuilder.setStartTime(queryInfo.getStartTime());
+          long endTime = (queryInfo.getFinishTime() == 0) ?
+              System.currentTimeMillis() : queryInfo.getFinishTime();
+          infoBuilder.setFinishTime(endTime);
+          infoBuilder.setProgress(queryInfo.getProgress());
+          infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+          infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+          builder.addQueryList(infoBuilder.build());
+        }
+
+        GetQueryListResponse result = builder.build();
+        return result;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetQueryStatusResponse getQueryStatus(RpcController controller,
+                                                 GetQueryStatusRequest request)
+        throws ServiceException {
+
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+
+        GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder();
+        QueryId queryId = new QueryId(request.getQueryId());
+        builder.setQueryId(request.getQueryId());
+
+        if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+          builder.setResultCode(ResultCode.OK);
+          builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+        } else {
+          QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+
+          // It will try to find a query status from a finished query list.
+          if (queryInProgress == null) {
+            queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
+          }
+          if (queryInProgress != null) {
+            QueryInfo queryInfo = queryInProgress.getQueryInfo();
+            builder.setResultCode(ResultCode.OK);
+            builder.setState(queryInfo.getQueryState());
+            builder.setProgress(queryInfo.getProgress());
+            builder.setSubmitTime(queryInfo.getStartTime());
+            if(queryInfo.getQueryMasterHost() != null) {
+              builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+              builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+            }
+            if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+              builder.setFinishTime(queryInfo.getFinishTime());
+            } else {
+              builder.setFinishTime(System.currentTimeMillis());
+            }
+          } else {
+            builder.setResultCode(ResultCode.ERROR);
+            builder.setErrorMessage("No such query: " + queryId.toString());
+          }
+        }
+        return builder.build();
+
+      } catch (Throwable t) {
+        throw new  ServiceException(t);
+      }
+    }
+
+    /**
+     * It is invoked by TajoContainerProxy.
+     */
+    @Override
+    public BoolProto killQuery(RpcController controller, KillQueryRequest request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        QueryId queryId = new QueryId(request.getQueryId());
+        QueryJobManager queryJobManager = context.getQueryJobManager();
+        queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+            new QueryInfo(queryId)));
+        return BOOL_TRUE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetClusterInfoResponse getClusterInfo(RpcController controller,
+                                                 GetClusterInfoRequest request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder();
+
+        Map<String, Worker> workers = context.getResourceManager().getWorkers();
+
+        List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+        Collections.sort(wokerKeys);
+
+        WorkerResourceInfo.Builder workerBuilder
+          = WorkerResourceInfo.newBuilder();
+
+        for(Worker worker: workers.values()) {
+          WorkerResource workerResource = worker.getResource();
+          workerBuilder.setAllocatedHost(worker.getHostName());
+          workerBuilder.setDiskSlots(workerResource.getDiskSlots());
+          workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
+          workerBuilder.setMemoryMB(workerResource.getMemoryMB());
+          workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
+          workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
+          workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
+          workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
+          workerBuilder.setWorkerStatus(worker.getState().toString());
+          workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
+          workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
+          workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
+          workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
+          workerBuilder.setClientPort(worker.getClientPort());
+          workerBuilder.setPullServerPort(worker.getPullServerPort());
+          workerBuilder.setHttpPort(worker.getHttpPort());
+          workerBuilder.setMaxHeap(workerResource.getMaxHeap());
+          workerBuilder.setFreeHeap(workerResource.getFreeHeap());
+          workerBuilder.setTotalHeap(workerResource.getTotalHeap());
+          workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
+          workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
+
+          builder.addWorkerList(workerBuilder.build());
+        }
+
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto createDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        if (context.getGlobalEngine().createDatabase(session, request.getValue(), null, false)) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public BoolProto existDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        if (catalog.existDatabase(request.getValue())) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public BoolProto dropDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        if (context.getGlobalEngine().dropDatabase(session, request.getValue(), false)) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public PrimitiveProtos.StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
+        request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getId());
+        return ProtoUtil.convertStrings(catalog.getAllDatabaseNames());
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public BoolProto existTable(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String databaseName;
+        String tableName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getValue();
+        }
+
+        if (catalog.existsTable(databaseName, tableName)) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public GetTableListResponse getTableList(RpcController controller,
+                                             GetTableListRequest request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        String databaseName;
+        if (request.hasDatabaseName()) {
+          databaseName = request.getDatabaseName();
+        } else {
+          databaseName = session.getCurrentDatabase();
+        }
+        Collection<String> tableNames = catalog.getAllTableNames(databaseName);
+        GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
+        builder.addAllTables(tableNames);
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public TableResponse getTableDesc(RpcController controller, GetTableDescRequest request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String databaseName;
+        String tableName;
+        if (CatalogUtil.isFQTableName(request.getTableName())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getTableName();
+        }
+
+        if (catalog.existsTable(databaseName, tableName)) {
+          return TableResponse.newBuilder()
+              .setResultCode(ResultCode.OK)
+              .setTableDesc(catalog.getTableDesc(databaseName, tableName).getProto())
+              .build();
+        } else {
+          return TableResponse.newBuilder()
+              .setResultCode(ResultCode.ERROR)
+              .setErrorMessage("ERROR: no such a table: " + request.getTableName())
+              .build();
+        }
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public TableResponse createExternalTable(RpcController controller, CreateTableRequest request)
+        throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        Path path = new Path(request.getPath());
+        FileSystem fs = path.getFileSystem(conf);
+
+        if (!fs.exists(path)) {
+          throw new IOException("No such a directory: " + path);
+        }
+
+        Schema schema = new Schema(request.getSchema());
+        TableMeta meta = new TableMeta(request.getMeta());
+        PartitionMethodDesc partitionDesc = null;
+        if (request.hasPartition()) {
+          partitionDesc = new PartitionMethodDesc(request.getPartition());
+        }
+
+        TableDesc desc;
+        try {
+          desc = context.getGlobalEngine().createTableOnPath(session, request.getName(), schema,
+              meta, path, true, partitionDesc, false);
+        } catch (Exception e) {
+          return TableResponse.newBuilder()
+              .setResultCode(ResultCode.ERROR)
+              .setErrorMessage(e.getMessage()).build();
+        }
+
+        return TableResponse.newBuilder()
+            .setResultCode(ResultCode.OK)
+            .setTableDesc(desc.getProto()).build();
+      } catch (InvalidSessionException ise) {
+        return TableResponse.newBuilder()
+            .setResultCode(ResultCode.ERROR)
+            .setErrorMessage(ise.getMessage()).build();
+      } catch (IOException ioe) {
+        return TableResponse.newBuilder()
+            .setResultCode(ResultCode.ERROR)
+            .setErrorMessage(ioe.getMessage()).build();
+      }
+    }
+
+    @Override
+    public BoolProto dropTable(RpcController controller, DropTableRequest dropTable) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId());
+        context.getGlobalEngine().dropTable(session, dropTable.getName(), false, dropTable.getPurge());
+        return BOOL_TRUE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public FunctionResponse getFunctionList(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+
+        String functionName = request.getValue();
+        Collection<FunctionDesc> functions = catalog.getFunctions();
+
+        List<CatalogProtos.FunctionDescProto> functionProtos = new ArrayList<CatalogProtos.FunctionDescProto>();
+
+        for (FunctionDesc eachFunction: functions) {
+          if (functionName == null || functionName.isEmpty()) {
+            functionProtos.add(eachFunction.getProto());
+          } else {
+            if(functionName.equals(eachFunction.getSignature())) {
+              functionProtos.add(eachFunction.getProto());
+            }
+          }
+        }
+        return FunctionResponse.newBuilder()
+            .setResultCode(ResultCode.OK)
+            .addAllFunctions(functionProtos)
+            .build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
new file mode 100644
index 0000000..5e9f729
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
+public class TajoMasterService extends AbstractService {
+  private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
+
+  private final TajoMaster.MasterContext context;
+  private final TajoConf conf;
+  private final TajoMasterServiceHandler masterHandler;
+  private AsyncRpcServer server;
+  private InetSocketAddress bindAddress;
+
+  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+  public TajoMasterService(TajoMaster.MasterContext context) {
+    super(TajoMasterService.class.getName());
+    this.context = context;
+    this.conf = context.getConf();
+    this.masterHandler = new TajoMasterServiceHandler();
+  }
+
+  @Override
+  public void start() {
+    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+    try {
+      server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    server.start();
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+        NetUtils.normalizeInetSocketAddress(bindAddress));
+    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  public class TajoMasterServiceHandler
+      implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
+    @Override
+    public void heartbeat(
+        RpcController controller,
+        TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort());
+      }
+
+      TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+
+      QueryJobManager queryJobManager = context.getQueryJobManager();
+      command = queryJobManager.queryHeartbeat(request);
+
+      TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
+      builder.setHeartbeatResult(BOOL_TRUE);
+      if(command != null) {
+        builder.setResponseCommand(command);
+      }
+
+      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
+      done.run(builder.build());
+    }
+
+    @Override
+    public void allocateWorkerResources(
+        RpcController controller,
+        TajoMasterProtocol.WorkerResourceAllocationRequest request,
+        RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
+      context.getResourceManager().allocateWorkerResources(request, done);
+    }
+
+    @Override
+    public void releaseWorkerResource(RpcController controller,
+                                           TajoMasterProtocol.WorkerResourceReleaseRequest request,
+                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+      List<YarnProtos.ContainerIdProto> containerIds = request.getContainerIdsList();
+
+      for(YarnProtos.ContainerIdProto eachContainer: containerIds) {
+        context.getResourceManager().releaseWorkerResource(eachContainer);
+      }
+      done.run(BOOL_TRUE);
+    }
+
+    @Override
+    public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
+                                RpcCallback<BoolProto> done) {
+      context.getQueryJobManager().stopQuery(new QueryId(request));
+      done.run(BOOL_TRUE);
+    }
+
+    @Override
+    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+                                     RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
+
+      TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
+          TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
+      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
+
+      for(Worker worker: workers) {
+        WorkerResource resource = worker.getResource();
+
+        TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
+            TajoMasterProtocol.WorkerResourceProto.newBuilder();
+
+        workerResource.setHost(worker.getHostName());
+        workerResource.setPeerRpcPort(worker.getPeerRpcPort());
+        workerResource.setInfoPort(worker.getHttpPort());
+        workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+        workerResource.setMemoryMB(resource.getMemoryMB());
+        workerResource.setDiskSlots(resource.getDiskSlots());
+        workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+
+        builder.addWorkerResources(workerResource);
+      }
+      done.run(builder.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
new file mode 100644
index 0000000..1e6655c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+
+import java.util.Collection;
+
+public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
+  public enum EventType {
+    CONTAINER_REMOTE_LAUNCH,
+    CONTAINER_REMOTE_CLEANUP
+  }
+
+  protected final ExecutionBlockId executionBlockId;
+  protected final Collection<Container> containers;
+  public TaskRunnerGroupEvent(EventType eventType,
+                              ExecutionBlockId executionBlockId,
+                              Collection<Container> containers) {
+    super(eventType);
+    this.executionBlockId = executionBlockId;
+    this.containers = containers;
+  }
+
+  public Collection<Container> getContainers() {
+    return containers;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
new file mode 100644
index 0000000..9086e65
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> {
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
new file mode 100644
index 0000000..755df5a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+public class TaskSchedulerContext {
+  private QueryMasterTask.QueryMasterTaskContext masterContext;
+  private boolean isLeafQuery;
+  private ExecutionBlockId blockId;
+  private int taskSize;
+  private int estimatedTaskNum;
+
+  public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery,
+                              ExecutionBlockId blockId) {
+    this.masterContext = masterContext;
+    this.isLeafQuery = isLeafQuery;
+    this.blockId = blockId;
+  }
+
+  public QueryMasterTask.QueryMasterTaskContext getMasterContext() {
+    return masterContext;
+  }
+
+  public boolean isLeafQuery() {
+    return isLeafQuery;
+  }
+
+  public ExecutionBlockId getBlockId() {
+    return blockId;
+  }
+
+  public int getTaskSize() {
+    return taskSize;
+  }
+
+  public int getEstimatedTaskNum() {
+    return estimatedTaskNum;
+  }
+
+  public void setTaskSize(int taskSize) {
+    this.taskSize = taskSize;
+  }
+
+  public void setEstimatedTaskNum(int estimatedTaskNum) {
+    this.estimatedTaskNum = estimatedTaskNum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
new file mode 100644
index 0000000..520ecd3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.master.querymaster.SubQuery;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class TaskSchedulerFactory {
+  private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+  private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, SubQuery.class };
+
+  public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
+      throws IOException {
+    if (CACHED_ALGORITHM_CLASS != null) {
+      return CACHED_ALGORITHM_CLASS;
+    } else {
+      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class);
+    }
+
+    if (CACHED_ALGORITHM_CLASS == null) {
+      throw new IOException("Task scheduler is null");
+    }
+    return CACHED_ALGORITHM_CLASS;
+  }
+
+  public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
+                                                        SubQuery subQuery) {
+    T result;
+    try {
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+      result = constructor.newInstance(new Object[]{context, subQuery});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, SubQuery subQuery)
+      throws IOException {
+    return get(getTaskSchedulerClass(conf), context, subQuery);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java
new file mode 100644
index 0000000..67d2ebc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+public enum TaskState {
+  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
new file mode 100644
index 0000000..4f178fb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+public class YarnContainerProxy extends ContainerProxy {
+  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+  protected final YarnRPC yarnRPC;
+  final protected String containerMgrAddress;
+  protected Token containerToken;
+
+  public YarnContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf, YarnRPC yarnRPC,
+                                  Container container, ExecutionBlockId executionBlockId) {
+    super(context, conf, executionBlockId, container);
+    this.yarnRPC = yarnRPC;
+
+    NodeId nodeId = container.getNodeId();
+    this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
+    this.containerToken = container.getContainerToken();
+  }
+
+  protected ContainerManagementProtocol getCMProxy(ContainerId containerID,
+                                                   final String containerManagerBindAddr,
+                                                   Token containerToken)
+      throws IOException {
+    String [] hosts = containerManagerBindAddr.split(":");
+    final InetSocketAddress cmAddr =
+        new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
+          ConverterUtils.convertFromYarn(containerToken, cmAddr);
+      // the user in createRemoteUser in this context has to be ContainerID
+      user = UserGroupInformation.createRemoteUser(containerID.toString());
+      user.addToken(token);
+    }
+
+    ContainerManagementProtocol proxy = user.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+      @Override
+      public ContainerManagementProtocol run() {
+        return (ContainerManagementProtocol) yarnRPC.getProxy(ContainerManagementProtocol.class,
+            cmAddr, conf);
+      }
+    });
+
+    return proxy;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
+    LOG.info("Launching Container with Id: " + containerID);
+    if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+      state = ContainerState.DONE;
+      LOG.error("Container (" + containerID + " was killed before it was launched");
+      return;
+    }
+
+    ContainerManagementProtocol proxy = null;
+    try {
+
+      proxy = getCMProxy(containerID, containerMgrAddress,
+          containerToken);
+
+      // Construct the actual Container
+      ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
+
+      // Now launch the actual container
+      List<StartContainerRequest> startRequestList = new ArrayList<StartContainerRequest>();
+      StartContainerRequest startRequest = Records
+          .newRecord(StartContainerRequest.class);
+      startRequest.setContainerLaunchContext(containerLaunchContext);
+      startRequestList.add(startRequest);
+      StartContainersRequest startRequests = Records.newRecord(StartContainersRequest.class);
+      startRequests.setStartContainerRequests(startRequestList);
+      StartContainersResponse response = proxy.startContainers(startRequests);
+
+      ByteBuffer portInfo = response.getAllServicesMetaData().get(PullServerAuxService.PULLSERVER_SERVICEID);
+
+      if(portInfo != null) {
+        port = PullServerAuxService.deserializeMetaData(portInfo);
+      }
+
+      LOG.info("PullServer port returned by ContainerManager for "
+          + containerID + " : " + port);
+
+      if(port < 0) {
+        this.state = ContainerState.FAILED;
+        throw new IllegalStateException("Invalid shuffle port number "
+            + port + " returned for " + containerID);
+      }
+
+      this.state = ContainerState.RUNNING;
+      this.hostName = containerMgrAddress.split(":")[0];
+      context.getResourceAllocator().addContainer(containerID, this);
+    } catch (Throwable t) {
+      String message = "Container launch failed for " + containerID + " : "
+          + StringUtils.stringifyException(t);
+      this.state = ContainerState.FAILED;
+      LOG.error(message);
+    } finally {
+      if (proxy != null) {
+        yarnRPC.stopProxy(proxy, conf);
+      }
+    }
+  }
+
+
+  public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+
+    // Duplicate the ByteBuffers for access by multiple containers.
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+    for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
+      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the local resources
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    //LOG.info("Setting up app master command");
+    vargs.add("${JAVA_HOME}" + "/bin/java");
+    // Set Xmx based on am memory size
+    vargs.add("-Xmx2000m");
+    // Set Remote Debugging
+    //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
+    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    //}
+    // Set class name
+    //vargs.add(getRunnerClass());
+    vargs.add(TajoWorker.class.getCanonicalName());
+    vargs.add("tr");     //workerMode
+    vargs.add(getId()); // subqueryId
+    vargs.add(containerMgrAddress); // nodeId
+    vargs.add(containerID.toString()); // containerId
+    Vector<CharSequence> taskParams = getTaskParams();
+    if(taskParams != null) {
+      vargs.addAll(taskParams);
+    }
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up TaskRunner command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+
+    return BuilderUtils.newContainerLaunchContext(commonContainerLaunchContext.getLocalResources(),
+        myEnv,
+        commands,
+        myServiceData,
+        null,
+        new HashMap<ApplicationAccessType, String>());
+  }
+
+  public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config,
+                                                                          String queryId, boolean isMaster) {
+    TajoConf conf = (TajoConf)config;
+
+    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+    try {
+      ByteBuffer userToken = ByteBuffer.wrap(UserGroupInformation.getCurrentUser().getShortUserName().getBytes());
+      ctx.setTokens(userToken);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the env variables to be setup
+    ////////////////////////////////////////////////////////////////////////////
+    LOG.info("Set the environment for the application master");
+
+    Map<String, String> environment = new HashMap<String, String>();
+    //String initialClassPath = getInitialClasspath(conf);
+    environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
+    if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
+      environment.put(ApplicationConstants.Environment.JAVA_HOME.name(), System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
+    }
+
+    // TODO - to be improved with org.apache.tajo.sh shell script
+    Properties prop = System.getProperties();
+
+    if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
+        (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
+      LOG.info("tajo.test is TRUE");
+      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty("java.class.path", null));
+      environment.put("tajo.test", "TRUE");
+    } else {
+      // Add AppMaster.jar location to classpath
+      // At some point we should not be required to add
+      // the hadoop specific classpaths to the env.
+      // It should be provided out of the box.
+      // For now setting all required classpaths including
+      // the classpath to "." for the application jar
+      StringBuilder classPathEnv = new StringBuilder("./");
+      //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+      for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
+        classPathEnv.append(':');
+        classPathEnv.append(c.trim());
+      }
+
+      classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
+      classPathEnv.append(":./log4j.properties:./*");
+      if(System.getenv("HADOOP_HOME") != null) {
+        environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
+        environment.put(
+            ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(),
+            System.getenv("HADOOP_HOME"));
+        environment.put(
+            ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(),
+            System.getenv("HADOOP_HOME"));
+        environment.put(
+            ApplicationConstants.Environment.HADOOP_YARN_HOME.name(),
+            System.getenv("HADOOP_HOME"));
+      }
+
+      if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
+        environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
+      }
+      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
+    }
+
+    ctx.setEnvironment(environment);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("=================================================");
+      for(Map.Entry<String, String> entry: environment.entrySet()) {
+        LOG.debug(entry.getKey() + "=" + entry.getValue());
+      }
+      LOG.debug("=================================================");
+    }
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the local resources
+    ////////////////////////////////////////////////////////////////////////////
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
+
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      FileContext fsCtx = FileContext.getFileContext(conf);
+      Path systemConfPath = TajoConf.getSystemConfPath(conf);
+      if (!fs.exists(systemConfPath)) {
+        LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");
+      }
+      LocalResource systemConfResource = createApplicationResource(fsCtx, systemConfPath, LocalResourceType.FILE);
+      localResources.put(TajoConstants.SYSTEM_CONF_FILENAME, systemConfResource);
+      ctx.setLocalResources(localResources);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    try {
+      serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID, PullServerAuxService.serializeMetaData(0));
+    } catch (IOException ioe) {
+      LOG.error(ioe);
+    }
+    ctx.setServiceData(serviceData);
+
+    return ctx;
+  }
+
+  private static LocalResource createApplicationResource(FileContext fs,
+                                                         Path p, LocalResourceType type)
+      throws IOException {
+    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    return rsrc;
+  }
+
+  private static void writeConf(Configuration conf, Path queryConfFile)
+      throws IOException {
+    // Write job file to Tajo's fs
+    FileSystem fs = queryConfFile.getFileSystem(conf);
+    FSDataOutputStream out =
+        FileSystem.create(fs, queryConfFile,
+            new FsPermission(QUERYCONF_FILE_PERMISSION));
+    try {
+      conf.writeXml(out);
+    } finally {
+      out.close();
+    }
+  }
+
+  @Override
+  public synchronized void stopContainer() {
+
+    if(isCompletelyDone()) {
+      return;
+    }
+    if(this.state == ContainerState.PREP) {
+      this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+    } else {
+      LOG.info("KILLING " + containerID);
+
+      ContainerManagementProtocol proxy = null;
+      try {
+        proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+            this.containerToken);
+
+        // kill the remote container if already launched
+        List<ContainerId> willBeStopedIds = new ArrayList<ContainerId>();
+        willBeStopedIds.add(this.containerID);
+        StopContainersRequest stopRequests = Records.newRecord(StopContainersRequest.class);
+        stopRequests.setContainerIds(willBeStopedIds);
+        proxy.stopContainers(stopRequests);
+        // If stopContainer returns without an error, assuming the stop made
+        // it over to the NodeManager.
+//          context.getEventHandler().handle(
+//              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+        context.getResourceAllocator().removeContainer(containerID);
+      } catch (Throwable t) {
+
+        // ignore the cleanup failure
+        String message = "cleanup failed for container "
+            + this.containerID + " : "
+            + StringUtils.stringifyException(t);
+//          context.getEventHandler().handle(
+//              new AMContainerEventStopFailed(containerID, message));
+        LOG.warn(message);
+        this.state = ContainerState.DONE;
+        return;
+      } finally {
+        if (proxy != null) {
+          yarnRPC.stopProxy(proxy, conf);
+        }
+      }
+      this.state = ContainerState.DONE;
+    }
+  }
+
+  protected Vector<CharSequence> getTaskParams() {
+    String queryMasterHost = context.getQueryMasterContext().getWorkerContext()
+        .getTajoWorkerManagerService().getBindAddr().getHostName();
+    int queryMasterPort = context.getQueryMasterContext().getWorkerContext()
+        .getTajoWorkerManagerService().getBindAddr().getPort();
+
+    Vector<CharSequence> taskParams = new Vector<CharSequence>();
+    taskParams.add(queryMasterHost); // queryMaster hostname
+    taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+    taskParams.add(context.getStagingDir().toString());
+    return taskParams;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
new file mode 100644
index 0000000..8b18b5a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
+
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(YarnTaskRunnerLauncherImpl.class);
+  //private final YarnRPC yarnRPC;
+  private final static RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  private QueryMasterTask.QueryMasterTaskContext context;
+
+  // For ContainerLauncherSpec
+  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+  private static String initialClasspath = null;
+  private static final Object classpathLock = new Object();
+  private ContainerLaunchContext commonContainerSpec = null;
+
+  final public static FsPermission QUERYCONF_FILE_PERMISSION =
+      FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  /** for launching TaskRunners in parallel */
+  private final ExecutorService executorService;
+
+  private YarnRPC yarnRPC;
+
+  public YarnTaskRunnerLauncherImpl(QueryMasterTask.QueryMasterTaskContext context, YarnRPC yarnRPC) {
+    super(YarnTaskRunnerLauncherImpl.class.getName());
+    this.context = context;
+    this.yarnRPC = yarnRPC;
+    executorService = Executors.newFixedThreadPool(
+        context.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+  }
+
+  public void start() {
+    super.start();
+  }
+
+  public void stop() {
+    executorService.shutdownNow();
+
+    Map<ContainerId, ContainerProxy> containers = context.getResourceAllocator().getContainers();
+    List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
+    for(ContainerProxy eachProxy:  list) {
+      try {
+        eachProxy.stopContainer();
+      } catch (Exception e) {
+      }
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handle(TaskRunnerGroupEvent event) {
+    if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+     launchTaskRunners(event.executionBlockId, event.getContainers());
+    } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
+      stopTaskRunners(event.getContainers());
+    }
+  }
+
+  private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+    commonContainerSpec = YarnContainerProxy.createCommonContainerLaunchContext(getConfig(),
+        executionBlockId.getQueryId().toString(), false);
+    for (Container container : containers) {
+      final ContainerProxy proxy = new YarnContainerProxy(context, getConfig(),
+          yarnRPC, container, executionBlockId);
+      executorService.submit(new LaunchRunner(container.getId(), proxy));
+    }
+  }
+
+  protected class LaunchRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+      this.proxy = proxy;
+      this.id = id;
+    }
+    @Override
+    public void run() {
+      proxy.launch(commonContainerSpec);
+      LOG.info("ContainerProxy started:" + id);
+    }
+  }
+
+  private void stopTaskRunners(Collection<Container> containers) {
+    for (Container container : containers) {
+      final ContainerProxy proxy = context.getResourceAllocator().getContainer(container.getId());
+      executorService.submit(new StopContainerRunner(container.getId(), proxy));
+    }
+  }
+
+  private static class StopContainerRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+      this.id = id;
+      this.proxy = proxy;
+    }
+
+    @Override
+    public void run() {
+      proxy.stopContainer();
+      LOG.info("ContainerProxy stopped:" + id);
+    }
+  }
+
+
+  /**
+   * Lock this on initialClasspath so that there is only one fork in the AM for
+   * getting the initial class-path. TODO: We already construct
+   * a parent CLC and use it for all the containers, so this should go away
+   * once the mr-generated-classpath stuff is gone.
+   */
+  private static String getInitialClasspath(Configuration conf) {
+    synchronized (classpathLock) {
+      if (initialClasspathFlag.get()) {
+        return initialClasspath;
+      }
+      Map<String, String> env = new HashMap<String, String>();
+
+      initialClasspath = env.get(Environment.CLASSPATH.name());
+      initialClasspathFlag.set(true);
+      return initialClasspath;
+    }
+  }
+
+//  public class TaskRunnerContainerProxy extends ContainerProxy {
+//    private final ExecutionBlockId executionBlockId;
+//
+//    public TaskRunnerContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+//                                    Container container, ExecutionBlockId executionBlockId) {
+//      super(context, conf, yarnRPC, container);
+//      this.executionBlockId = executionBlockId;
+//    }
+//
+//    @Override
+//    protected void containerStarted() {
+//      context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+//    }
+//
+//    @Override
+//    protected String getId() {
+//      return executionBlockId.toString();
+//    }
+//
+//    @Override
+//    protected String getRunnerClass() {
+//      return TaskRunner.class.getCanonicalSignature();
+//    }
+//
+//    @Override
+//    protected Vector<CharSequence> getTaskParams() {
+//      Vector<CharSequence> taskParams = new Vector<CharSequence>();
+//      taskParams.add(queryMasterHost); // queryMaster hostname
+//      taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+//
+//      return taskParams;
+//    }
+//  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java b/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java
new file mode 100644
index 0000000..028af65
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.cluster;
+
+public class ServerName implements Comparable<ServerName> {
+  /**
+   * This character is used as separator between server hostname and port.
+   */
+  public static final String SERVERNAME_SEPARATOR = ":";
+
+  private final String serverName;
+  private final String hostname;
+  private final int port;
+
+
+  public ServerName(final String hostname, final int port) {
+    this.hostname = hostname;
+    this.port = port;
+    this.serverName = getServerName(hostname, port);
+  }
+
+  public ServerName(final String serverName) {
+    this(parseHostname(serverName), parsePort(serverName));
+  }
+  
+  public static ServerName create(final String serverName) {
+	  return new ServerName(serverName);
+  }
+
+  public static ServerName createWithDefaultPort(final String serverName,
+                                                 final int defaultPort) {
+    if (serverName == null || serverName.length() <= 0) {
+      throw new IllegalArgumentException("Passed hostname is null or empty ("
+          + serverName + ")");
+    }
+    int index = serverName.indexOf(SERVERNAME_SEPARATOR);
+    if (index == -1) {
+      return new ServerName(parseHostname(serverName), defaultPort);
+    } else {
+      return new ServerName(parseHostname(serverName), parsePort(serverName));
+    }
+  }
+
+  public static String parseHostname(final String serverName) {
+    if (serverName == null || serverName.length() <= 0) {
+      throw new IllegalArgumentException("Passed hostname is null or empty ("
+          + serverName + ")");
+    }
+    int index = serverName.indexOf(SERVERNAME_SEPARATOR);
+    if (index == -1) { // if a port is missing, the index will be set to -1.
+      throw new IllegalArgumentException("Passed port is missing ("
+          + serverName + ")");
+    }
+    return serverName.substring(0, index);
+  }
+
+  public static int parsePort(final String serverName) {
+    String [] split = serverName.split(SERVERNAME_SEPARATOR);
+    return Integer.parseInt(split[1]);
+  }
+
+  @Override
+  public String toString() {
+    return getServerName();
+  }
+
+  public String getServerName() {
+    return serverName;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public static String getServerName(String hostName, int port) {
+    final StringBuilder name = new StringBuilder(hostName.length() + 4);
+    name.append(hostName);
+    name.append(SERVERNAME_SEPARATOR);
+    name.append(port);
+    return name.toString();
+  }
+
+  @Override
+  public int compareTo(ServerName other) {
+    int compare = this.getHostname().toLowerCase().
+      compareTo(other.getHostname().toLowerCase());
+    if (compare != 0) return compare;
+    return this.getPort() - other.getPort();        
+  }
+
+  @Override
+  public int hashCode() {
+    return getServerName().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null) return false;
+    if (!(o instanceof ServerName)) return false;
+    return this.compareTo((ServerName)o) == 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
new file mode 100644
index 0000000..c3a9a59
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+
+public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEventType>  {
+
+  private final ExecutionBlockId executionBlockId;
+  private final Priority priority;
+  private final Resource resource;
+  private final boolean isLeafQuery;
+  private final int requiredNum;
+  private final float progress;
+
+  public ContainerAllocationEvent(ContainerAllocatorEventType eventType,
+                                  ExecutionBlockId executionBlockId,
+                                  Priority priority,
+                                  Resource resource,
+                                  int requiredNum,
+                                  boolean isLeafQuery, float progress) {
+    super(eventType);
+    this.executionBlockId = executionBlockId;
+    this.priority = priority;
+    this.resource = resource;
+    this.requiredNum = requiredNum;
+    this.isLeafQuery = isLeafQuery;
+    this.progress = progress;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public Priority getPriority() {
+    return priority;
+  }
+
+  public int getRequiredNum() {
+    return requiredNum;
+  }
+
+  public boolean isLeafQuery() {
+    return isLeafQuery;
+  }
+
+  public Resource getCapability() {
+    return resource;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public Resource getResource() {
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
new file mode 100644
index 0000000..4d10efe
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+public enum ContainerAllocatorEventType {
+  // producer: QueryUnitAttempt, consumer: ContainerAllocator
+  CONTAINER_REQ,
+  CONTAINER_DEALLOCATE,
+  CONTAINER_FAILED
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
new file mode 100644
index 0000000..723ac1a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.master.event.ContainerEvent.EventType;
+
+public class ContainerEvent extends AbstractEvent<EventType> {
+  public enum EventType {
+    CONTAINER_LAUNCHED,
+    CONTAINER_STOPPED
+  }
+
+  private final ContainerId cId;
+
+  public ContainerEvent(EventType eventType, ContainerId cId) {
+    super(eventType);
+    this.cId = cId;
+  }
+}