You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:29 UTC
[26/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core
into the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
new file mode 100644
index 0000000..c968a73
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -0,0 +1,754 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobEvent;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.session.InvalidSessionException;
+import org.apache.tajo.master.session.NoSuchSessionVariableException;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.rpc.BlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
+public class TajoMasterClientService extends AbstractService {
+ private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class);
+ private final MasterContext context;
+ private final TajoConf conf;
+ private final CatalogService catalog;
+ private final TajoMasterClientProtocolServiceHandler clientHandler;
+ private BlockingRpcServer server;
+ private InetSocketAddress bindAddress;
+
+ private final BoolProto BOOL_TRUE =
+ BoolProto.newBuilder().setValue(true).build();
+ private final BoolProto BOOL_FALSE =
+ BoolProto.newBuilder().setValue(false).build();
+
+ public TajoMasterClientService(MasterContext context) {
+ super(TajoMasterClientService.class.getName());
+ this.context = context;
+ this.conf = context.getConf();
+ this.catalog = context.getCatalog();
+ this.clientHandler = new TajoMasterClientProtocolServiceHandler();
+ }
+
+ @Override
+ public void start() {
+
+ // start the rpc server
+ String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
+ int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+ try {
+ server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum);
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ server.start();
+
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+ LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (server != null) {
+ server.shutdown();
+ }
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return this.bindAddress;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // TajoMasterClientProtocolService
+ /////////////////////////////////////////////////////////////////////////////
+ public class TajoMasterClientProtocolServiceHandler implements TajoMasterClientProtocolService.BlockingInterface {
+ @Override
+ public CreateSessionResponse createSession(RpcController controller, CreateSessionRequest request)
+ throws ServiceException {
+ try {
+ // create a new session with base database name. If no database name is give, we use default database.
+ String databaseName = request.hasBaseDatabaseName() ? request.getBaseDatabaseName() : DEFAULT_DATABASE_NAME;
+
+ if (!context.getCatalog().existDatabase(databaseName)) {
+ LOG.info("Session creation is canceled due to absent base database \"" + databaseName + "\".");
+ throw new NoSuchDatabaseException(databaseName);
+ }
+
+ String sessionId =
+ context.getSessionManager().createSession(request.getUsername(), databaseName);
+ CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+ builder.setState(CreateSessionResponse.ResultState.SUCCESS);
+ builder.setSessionId(TajoIdProtos.SessionIdProto.newBuilder().setId(sessionId).build());
+ return builder.build();
+ } catch (NoSuchDatabaseException nsde) {
+ CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+ builder.setState(CreateSessionResponse.ResultState.FAILED);
+ builder.setMessage(nsde.getMessage());
+ return builder.build();
+ } catch (InvalidSessionException e) {
+ CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+ builder.setState(CreateSessionResponse.ResultState.FAILED);
+ builder.setMessage(e.getMessage());
+ return builder.build();
+ }
+ }
+
+ @Override
+ public BoolProto removeSession(RpcController controller, TajoIdProtos.SessionIdProto request)
+ throws ServiceException {
+ if (request != null) {
+ context.getSessionManager().removeSession(request.getId());
+ }
+ return ProtoUtil.TRUE;
+ }
+
+ @Override
+ public BoolProto updateSessionVariables(RpcController controller, UpdateSessionVariableRequest request)
+ throws ServiceException {
+ try {
+ String sessionId = request.getSessionId().getId();
+ for (CatalogProtos.KeyValueProto kv : request.getSetVariables().getKeyvalList()) {
+ context.getSessionManager().setVariable(sessionId, kv.getKey(), kv.getValue());
+ }
+ for (String unsetVariable : request.getUnsetVariablesList()) {
+ context.getSessionManager().removeVariable(sessionId, unsetVariable);
+ }
+ return ProtoUtil.TRUE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public StringProto getSessionVariable(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+
+ try {
+ return ProtoUtil.convertString(
+ context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue()));
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue());
+ if (value != null) {
+ return ProtoUtil.TRUE;
+ } else {
+ return ProtoUtil.FALSE;
+ }
+ } catch (NoSuchSessionVariableException nssv) {
+ return ProtoUtil.FALSE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public CatalogProtos.KeyValueSetProto getAllSessionVariables(RpcController controller,
+ TajoIdProtos.SessionIdProto request)
+ throws ServiceException {
+ try {
+ String sessionId = request.getId();
+ Options options = new Options();
+ options.putAll(context.getSessionManager().getAllVariables(sessionId));
+ return options.getProto();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public StringProto getCurrentDatabase(RpcController controller, TajoIdProtos.SessionIdProto request)
+ throws ServiceException {
+ try {
+ String sessionId = request.getId();
+ return ProtoUtil.convertString(context.getSessionManager().getSession(sessionId).getCurrentDatabase());
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto selectDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ String sessionId = request.getSessionId().getId();
+ String databaseName = request.getValue();
+
+ if (context.getCatalog().existDatabase(databaseName)) {
+ context.getSessionManager().getSession(sessionId).selectDatabase(databaseName);
+ return ProtoUtil.TRUE;
+ } else {
+ throw new ServiceException(new NoSuchDatabaseException(databaseName));
+ }
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public SubmitQueryResponse submitQuery(RpcController controller, QueryRequest request) throws ServiceException {
+
+
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Query [" + request.getQuery() + "] is submitted");
+ }
+ return context.getGlobalEngine().executeQuery(session, request.getQuery());
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ SubmitQueryResponse.Builder responseBuilder = ClientProtos.SubmitQueryResponse.newBuilder();
+ responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME));
+ responseBuilder.setResultCode(ResultCode.ERROR);
+ if (e.getMessage() != null) {
+ responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+ } else {
+ responseBuilder.setErrorMessage("Internal Error");
+ }
+ return responseBuilder.build();
+ }
+ }
+
+ @Override
+ public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest request) throws ServiceException {
+
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
+ try {
+ context.getGlobalEngine().updateQuery(session, request.getQuery());
+ builder.setResultCode(ResultCode.OK);
+ return builder.build();
+ } catch (Exception e) {
+ builder.setResultCode(ResultCode.ERROR);
+ if (e.getMessage() == null) {
+ builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+ }
+ return builder.build();
+ }
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetQueryResultResponse getQueryResult(RpcController controller,
+ GetQueryResultRequest request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ QueryId queryId = new QueryId(request.getQueryId());
+ QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+
+ // if we cannot get a QueryInProgress instance from QueryJobManager,
+ // the instance can be in the finished query list.
+ if (queryInProgress == null) {
+ queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
+ }
+
+ GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder();
+
+ // If we cannot the QueryInProgress instance from the finished list,
+ // the query result was expired due to timeout.
+ // In this case, we will result in error.
+ if (queryInProgress == null) {
+ builder.setErrorMessage("No such query: " + queryId.toString());
+ return builder.build();
+ }
+
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+ try {
+ //TODO After implementation Tajo's user security feature, Should be modified.
+ builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
+ } catch (IOException e) {
+ LOG.warn("Can't get current user name");
+ }
+ switch (queryInfo.getQueryState()) {
+ case QUERY_SUCCEEDED:
+ // TODO check this logic needed
+ //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
+ break;
+ case QUERY_FAILED:
+ case QUERY_ERROR:
+ builder.setErrorMessage("Query " + queryId + " is failed");
+ default:
+ builder.setErrorMessage("Query " + queryId + " is still running");
+ }
+
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetQueryListResponse getRunningQueryList(RpcController controller, GetQueryListRequest request)
+
+ throws ServiceException {
+
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();
+
+ Collection<QueryInProgress> queries
+ = context.getQueryJobManager().getRunningQueries();
+
+ BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+
+ for (QueryInProgress queryInProgress : queries) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+ infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ infoBuilder.setState(queryInfo.getQueryState());
+ infoBuilder.setQuery(queryInfo.getSql());
+ infoBuilder.setStartTime(queryInfo.getStartTime());
+ long endTime = (queryInfo.getFinishTime() == 0) ?
+ System.currentTimeMillis() : queryInfo.getFinishTime();
+ infoBuilder.setFinishTime(endTime);
+ infoBuilder.setProgress(queryInfo.getProgress());
+ infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+ infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+ builder.addQueryList(infoBuilder.build());
+ }
+
+ GetQueryListResponse result = builder.build();
+ return result;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetQueryListResponse getFinishedQueryList(RpcController controller, GetQueryListRequest request)
+ throws ServiceException {
+
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder();
+
+ Collection<QueryInProgress> queries
+ = context.getQueryJobManager().getFinishedQueries();
+
+ BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+
+ for (QueryInProgress queryInProgress : queries) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+ infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ infoBuilder.setState(queryInfo.getQueryState());
+ infoBuilder.setQuery(queryInfo.getSql());
+ infoBuilder.setStartTime(queryInfo.getStartTime());
+ long endTime = (queryInfo.getFinishTime() == 0) ?
+ System.currentTimeMillis() : queryInfo.getFinishTime();
+ infoBuilder.setFinishTime(endTime);
+ infoBuilder.setProgress(queryInfo.getProgress());
+ infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+ infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+ builder.addQueryList(infoBuilder.build());
+ }
+
+ GetQueryListResponse result = builder.build();
+ return result;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetQueryStatusResponse getQueryStatus(RpcController controller,
+ GetQueryStatusRequest request)
+ throws ServiceException {
+
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+
+ GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder();
+ QueryId queryId = new QueryId(request.getQueryId());
+ builder.setQueryId(request.getQueryId());
+
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+ builder.setResultCode(ResultCode.OK);
+ builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+ } else {
+ QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+
+ // It will try to find a query status from a finished query list.
+ if (queryInProgress == null) {
+ queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
+ }
+ if (queryInProgress != null) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+ builder.setResultCode(ResultCode.OK);
+ builder.setState(queryInfo.getQueryState());
+ builder.setProgress(queryInfo.getProgress());
+ builder.setSubmitTime(queryInfo.getStartTime());
+ if(queryInfo.getQueryMasterHost() != null) {
+ builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+ }
+ if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ builder.setFinishTime(queryInfo.getFinishTime());
+ } else {
+ builder.setFinishTime(System.currentTimeMillis());
+ }
+ } else {
+ builder.setResultCode(ResultCode.ERROR);
+ builder.setErrorMessage("No such query: " + queryId.toString());
+ }
+ }
+ return builder.build();
+
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ /**
+ * It is invoked by TajoContainerProxy.
+ */
+ @Override
+ public BoolProto killQuery(RpcController controller, KillQueryRequest request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ QueryId queryId = new QueryId(request.getQueryId());
+ QueryJobManager queryJobManager = context.getQueryJobManager();
+ queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+ new QueryInfo(queryId)));
+ return BOOL_TRUE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetClusterInfoResponse getClusterInfo(RpcController controller,
+ GetClusterInfoRequest request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder();
+
+ Map<String, Worker> workers = context.getResourceManager().getWorkers();
+
+ List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Collections.sort(wokerKeys);
+
+ WorkerResourceInfo.Builder workerBuilder
+ = WorkerResourceInfo.newBuilder();
+
+ for(Worker worker: workers.values()) {
+ WorkerResource workerResource = worker.getResource();
+ workerBuilder.setAllocatedHost(worker.getHostName());
+ workerBuilder.setDiskSlots(workerResource.getDiskSlots());
+ workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
+ workerBuilder.setMemoryMB(workerResource.getMemoryMB());
+ workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
+ workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
+ workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
+ workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
+ workerBuilder.setWorkerStatus(worker.getState().toString());
+ workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
+ workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
+ workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
+ workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
+ workerBuilder.setClientPort(worker.getClientPort());
+ workerBuilder.setPullServerPort(worker.getPullServerPort());
+ workerBuilder.setHttpPort(worker.getHttpPort());
+ workerBuilder.setMaxHeap(workerResource.getMaxHeap());
+ workerBuilder.setFreeHeap(workerResource.getFreeHeap());
+ workerBuilder.setTotalHeap(workerResource.getTotalHeap());
+ workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
+ workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
+
+ builder.addWorkerList(workerBuilder.build());
+ }
+
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto createDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ if (context.getGlobalEngine().createDatabase(session, request.getValue(), null, false)) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public BoolProto existDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ if (catalog.existDatabase(request.getValue())) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public BoolProto dropDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ if (context.getGlobalEngine().dropDatabase(session, request.getValue(), false)) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public PrimitiveProtos.StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
+ request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getId());
+ return ProtoUtil.convertStrings(catalog.getAllDatabaseNames());
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public BoolProto existTable(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String databaseName;
+ String tableName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getValue();
+ }
+
+ if (catalog.existsTable(databaseName, tableName)) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GetTableListResponse getTableList(RpcController controller,
+ GetTableListRequest request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ String databaseName;
+ if (request.hasDatabaseName()) {
+ databaseName = request.getDatabaseName();
+ } else {
+ databaseName = session.getCurrentDatabase();
+ }
+ Collection<String> tableNames = catalog.getAllTableNames(databaseName);
+ GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
+ builder.addAllTables(tableNames);
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public TableResponse getTableDesc(RpcController controller, GetTableDescRequest request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String databaseName;
+ String tableName;
+ if (CatalogUtil.isFQTableName(request.getTableName())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getTableName();
+ }
+
+ if (catalog.existsTable(databaseName, tableName)) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
+ .setTableDesc(catalog.getTableDesc(databaseName, tableName).getProto())
+ .build();
+ } else {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage("ERROR: no such a table: " + request.getTableName())
+ .build();
+ }
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public TableResponse createExternalTable(RpcController controller, CreateTableRequest request)
+ throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ Path path = new Path(request.getPath());
+ FileSystem fs = path.getFileSystem(conf);
+
+ if (!fs.exists(path)) {
+ throw new IOException("No such a directory: " + path);
+ }
+
+ Schema schema = new Schema(request.getSchema());
+ TableMeta meta = new TableMeta(request.getMeta());
+ PartitionMethodDesc partitionDesc = null;
+ if (request.hasPartition()) {
+ partitionDesc = new PartitionMethodDesc(request.getPartition());
+ }
+
+ TableDesc desc;
+ try {
+ desc = context.getGlobalEngine().createTableOnPath(session, request.getName(), schema,
+ meta, path, true, partitionDesc, false);
+ } catch (Exception e) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(e.getMessage()).build();
+ }
+
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
+ .setTableDesc(desc.getProto()).build();
+ } catch (InvalidSessionException ise) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(ise.getMessage()).build();
+ } catch (IOException ioe) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(ioe.getMessage()).build();
+ }
+ }
+
+ @Override
+ public BoolProto dropTable(RpcController controller, DropTableRequest dropTable) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId());
+ context.getGlobalEngine().dropTable(session, dropTable.getName(), false, dropTable.getPurge());
+ return BOOL_TRUE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public FunctionResponse getFunctionList(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+
+ String functionName = request.getValue();
+ Collection<FunctionDesc> functions = catalog.getFunctions();
+
+ List<CatalogProtos.FunctionDescProto> functionProtos = new ArrayList<CatalogProtos.FunctionDescProto>();
+
+ for (FunctionDesc eachFunction: functions) {
+ if (functionName == null || functionName.isEmpty()) {
+ functionProtos.add(eachFunction.getProto());
+ } else {
+ if(functionName.equals(eachFunction.getSignature())) {
+ functionProtos.add(eachFunction.getProto());
+ }
+ }
+ }
+ return FunctionResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
+ .addAllFunctions(functionProtos)
+ .build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
new file mode 100644
index 0000000..5e9f729
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
+public class TajoMasterService extends AbstractService {
+ private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
+
+ private final TajoMaster.MasterContext context;
+ private final TajoConf conf;
+ private final TajoMasterServiceHandler masterHandler;
+ private AsyncRpcServer server;
+ private InetSocketAddress bindAddress;
+
+ private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+ private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+ public TajoMasterService(TajoMaster.MasterContext context) {
+ super(TajoMasterService.class.getName());
+ this.context = context;
+ this.conf = context.getConf();
+ this.masterHandler = new TajoMasterServiceHandler();
+ }
+
+ @Override
+ public void start() {
+ String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+ try {
+ server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ server.start();
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+ NetUtils.normalizeInetSocketAddress(bindAddress));
+ LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(server != null) {
+ server.shutdown();
+ server = null;
+ }
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ public class TajoMasterServiceHandler
+ implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
+ @Override
+ public void heartbeat(
+ RpcController controller,
+ TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort());
+ }
+
+ TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+
+ QueryJobManager queryJobManager = context.getQueryJobManager();
+ command = queryJobManager.queryHeartbeat(request);
+
+ TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
+ builder.setHeartbeatResult(BOOL_TRUE);
+ if(command != null) {
+ builder.setResponseCommand(command);
+ }
+
+ builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
+ done.run(builder.build());
+ }
+
+ @Override
+ public void allocateWorkerResources(
+ RpcController controller,
+ TajoMasterProtocol.WorkerResourceAllocationRequest request,
+ RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
+ context.getResourceManager().allocateWorkerResources(request, done);
+ }
+
+ @Override
+ public void releaseWorkerResource(RpcController controller,
+ TajoMasterProtocol.WorkerResourceReleaseRequest request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ List<YarnProtos.ContainerIdProto> containerIds = request.getContainerIdsList();
+
+ for(YarnProtos.ContainerIdProto eachContainer: containerIds) {
+ context.getResourceManager().releaseWorkerResource(eachContainer);
+ }
+ done.run(BOOL_TRUE);
+ }
+
+ @Override
+ public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
+ RpcCallback<BoolProto> done) {
+ context.getQueryJobManager().stopQuery(new QueryId(request));
+ done.run(BOOL_TRUE);
+ }
+
+ @Override
+ public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+ RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
+
+ TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
+ TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
+ Collection<Worker> workers = context.getResourceManager().getWorkers().values();
+
+ for(Worker worker: workers) {
+ WorkerResource resource = worker.getResource();
+
+ TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
+ TajoMasterProtocol.WorkerResourceProto.newBuilder();
+
+ workerResource.setHost(worker.getHostName());
+ workerResource.setPeerRpcPort(worker.getPeerRpcPort());
+ workerResource.setInfoPort(worker.getHttpPort());
+ workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+ workerResource.setMemoryMB(resource.getMemoryMB());
+ workerResource.setDiskSlots(resource.getDiskSlots());
+ workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+
+ builder.addWorkerResources(workerResource);
+ }
+ done.run(builder.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
new file mode 100644
index 0000000..1e6655c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+
+import java.util.Collection;
+
+public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
+ public enum EventType {
+ CONTAINER_REMOTE_LAUNCH,
+ CONTAINER_REMOTE_CLEANUP
+ }
+
+ protected final ExecutionBlockId executionBlockId;
+ protected final Collection<Container> containers;
+ public TaskRunnerGroupEvent(EventType eventType,
+ ExecutionBlockId executionBlockId,
+ Collection<Container> containers) {
+ super(eventType);
+ this.executionBlockId = executionBlockId;
+ this.containers = containers;
+ }
+
+ public Collection<Container> getContainers() {
+ return containers;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
new file mode 100644
index 0000000..9086e65
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> {
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
new file mode 100644
index 0000000..755df5a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+public class TaskSchedulerContext {
+ private QueryMasterTask.QueryMasterTaskContext masterContext;
+ private boolean isLeafQuery;
+ private ExecutionBlockId blockId;
+ private int taskSize;
+ private int estimatedTaskNum;
+
+ public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery,
+ ExecutionBlockId blockId) {
+ this.masterContext = masterContext;
+ this.isLeafQuery = isLeafQuery;
+ this.blockId = blockId;
+ }
+
+ public QueryMasterTask.QueryMasterTaskContext getMasterContext() {
+ return masterContext;
+ }
+
+ public boolean isLeafQuery() {
+ return isLeafQuery;
+ }
+
+ public ExecutionBlockId getBlockId() {
+ return blockId;
+ }
+
+ public int getTaskSize() {
+ return taskSize;
+ }
+
+ public int getEstimatedTaskNum() {
+ return estimatedTaskNum;
+ }
+
+ public void setTaskSize(int taskSize) {
+ this.taskSize = taskSize;
+ }
+
+ public void setEstimatedTaskNum(int estimatedTaskNum) {
+ this.estimatedTaskNum = estimatedTaskNum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
new file mode 100644
index 0000000..520ecd3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.master.querymaster.SubQuery;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class TaskSchedulerFactory {
+ private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+ private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, SubQuery.class };
+
+ public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
+ throws IOException {
+ if (CACHED_ALGORITHM_CLASS != null) {
+ return CACHED_ALGORITHM_CLASS;
+ } else {
+ CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class);
+ }
+
+ if (CACHED_ALGORITHM_CLASS == null) {
+ throw new IOException("Task scheduler is null");
+ }
+ return CACHED_ALGORITHM_CLASS;
+ }
+
+ public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
+ SubQuery subQuery) {
+ T result;
+ try {
+ Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+ if (constructor == null) {
+ constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(clazz, constructor);
+ }
+ result = constructor.newInstance(new Object[]{context, subQuery});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
+ public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, SubQuery subQuery)
+ throws IOException {
+ return get(getTaskSchedulerClass(conf), context, subQuery);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java
new file mode 100644
index 0000000..67d2ebc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskState.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+public enum TaskState {
+ NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
new file mode 100644
index 0000000..4f178fb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+public class YarnContainerProxy extends ContainerProxy {
+ private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ protected final YarnRPC yarnRPC;
+ final protected String containerMgrAddress;
+ protected Token containerToken;
+
+ public YarnContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf, YarnRPC yarnRPC,
+ Container container, ExecutionBlockId executionBlockId) {
+ super(context, conf, executionBlockId, container);
+ this.yarnRPC = yarnRPC;
+
+ NodeId nodeId = container.getNodeId();
+ this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
+ this.containerToken = container.getContainerToken();
+ }
+
+ protected ContainerManagementProtocol getCMProxy(ContainerId containerID,
+ final String containerManagerBindAddr,
+ Token containerToken)
+ throws IOException {
+ String [] hosts = containerManagerBindAddr.split(":");
+ final InetSocketAddress cmAddr =
+ new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
+ ConverterUtils.convertFromYarn(containerToken, cmAddr);
+ // the user in createRemoteUser in this context has to be ContainerID
+ user = UserGroupInformation.createRemoteUser(containerID.toString());
+ user.addToken(token);
+ }
+
+ ContainerManagementProtocol proxy = user.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+ @Override
+ public ContainerManagementProtocol run() {
+ return (ContainerManagementProtocol) yarnRPC.getProxy(ContainerManagementProtocol.class,
+ cmAddr, conf);
+ }
+ });
+
+ return proxy;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
+ LOG.info("Launching Container with Id: " + containerID);
+ if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+ state = ContainerState.DONE;
+ LOG.error("Container (" + containerID + " was killed before it was launched");
+ return;
+ }
+
+ ContainerManagementProtocol proxy = null;
+ try {
+
+ proxy = getCMProxy(containerID, containerMgrAddress,
+ containerToken);
+
+ // Construct the actual Container
+ ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
+
+ // Now launch the actual container
+ List<StartContainerRequest> startRequestList = new ArrayList<StartContainerRequest>();
+ StartContainerRequest startRequest = Records
+ .newRecord(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ startRequestList.add(startRequest);
+ StartContainersRequest startRequests = Records.newRecord(StartContainersRequest.class);
+ startRequests.setStartContainerRequests(startRequestList);
+ StartContainersResponse response = proxy.startContainers(startRequests);
+
+ ByteBuffer portInfo = response.getAllServicesMetaData().get(PullServerAuxService.PULLSERVER_SERVICEID);
+
+ if(portInfo != null) {
+ port = PullServerAuxService.deserializeMetaData(portInfo);
+ }
+
+ LOG.info("PullServer port returned by ContainerManager for "
+ + containerID + " : " + port);
+
+ if(port < 0) {
+ this.state = ContainerState.FAILED;
+ throw new IllegalStateException("Invalid shuffle port number "
+ + port + " returned for " + containerID);
+ }
+
+ this.state = ContainerState.RUNNING;
+ this.hostName = containerMgrAddress.split(":")[0];
+ context.getResourceAllocator().addContainer(containerID, this);
+ } catch (Throwable t) {
+ String message = "Container launch failed for " + containerID + " : "
+ + StringUtils.stringifyException(t);
+ this.state = ContainerState.FAILED;
+ LOG.error(message);
+ } finally {
+ if (proxy != null) {
+ yarnRPC.stopProxy(proxy, conf);
+ }
+ }
+ }
+
+
+ public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+
+ // Duplicate the ByteBuffers for access by multiple containers.
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ //LOG.info("Setting up app master command");
+ vargs.add("${JAVA_HOME}" + "/bin/java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx2000m");
+ // Set Remote Debugging
+ //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
+ //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ //}
+ // Set class name
+ //vargs.add(getRunnerClass());
+ vargs.add(TajoWorker.class.getCanonicalName());
+ vargs.add("tr"); //workerMode
+ vargs.add(getId()); // subqueryId
+ vargs.add(containerMgrAddress); // nodeId
+ vargs.add(containerID.toString()); // containerId
+ Vector<CharSequence> taskParams = getTaskParams();
+ if(taskParams != null) {
+ vargs.addAll(taskParams);
+ }
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up TaskRunner command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ return BuilderUtils.newContainerLaunchContext(commonContainerLaunchContext.getLocalResources(),
+ myEnv,
+ commands,
+ myServiceData,
+ null,
+ new HashMap<ApplicationAccessType, String>());
+ }
+
+ public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config,
+ String queryId, boolean isMaster) {
+ TajoConf conf = (TajoConf)config;
+
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+ try {
+ ByteBuffer userToken = ByteBuffer.wrap(UserGroupInformation.getCurrentUser().getShortUserName().getBytes());
+ ctx.setTokens(userToken);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the env variables to be setup
+ ////////////////////////////////////////////////////////////////////////////
+ LOG.info("Set the environment for the application master");
+
+ Map<String, String> environment = new HashMap<String, String>();
+ //String initialClassPath = getInitialClasspath(conf);
+ environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
+ if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
+ environment.put(ApplicationConstants.Environment.JAVA_HOME.name(), System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
+ }
+
+ // TODO - to be improved with org.apache.tajo.sh shell script
+ Properties prop = System.getProperties();
+
+ if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
+ (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
+ LOG.info("tajo.test is TRUE");
+ environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty("java.class.path", null));
+ environment.put("tajo.test", "TRUE");
+ } else {
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder("./");
+ //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+ for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+
+ classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
+ classPathEnv.append(":./log4j.properties:./*");
+ if(System.getenv("HADOOP_HOME") != null) {
+ environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
+ environment.put(
+ ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(),
+ System.getenv("HADOOP_HOME"));
+ environment.put(
+ ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(),
+ System.getenv("HADOOP_HOME"));
+ environment.put(
+ ApplicationConstants.Environment.HADOOP_YARN_HOME.name(),
+ System.getenv("HADOOP_HOME"));
+ }
+
+ if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
+ environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
+ }
+ environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
+ }
+
+ ctx.setEnvironment(environment);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("=================================================");
+ for(Map.Entry<String, String> entry: environment.entrySet()) {
+ LOG.debug(entry.getKey() + "=" + entry.getValue());
+ }
+ LOG.debug("=================================================");
+ }
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
+
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ FileContext fsCtx = FileContext.getFileContext(conf);
+ Path systemConfPath = TajoConf.getSystemConfPath(conf);
+ if (!fs.exists(systemConfPath)) {
+ LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");
+ }
+ LocalResource systemConfResource = createApplicationResource(fsCtx, systemConfPath, LocalResourceType.FILE);
+ localResources.put(TajoConstants.SYSTEM_CONF_FILENAME, systemConfResource);
+ ctx.setLocalResources(localResources);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+ try {
+ serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID, PullServerAuxService.serializeMetaData(0));
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ }
+ ctx.setServiceData(serviceData);
+
+ return ctx;
+ }
+
+ private static LocalResource createApplicationResource(FileContext fs,
+ Path p, LocalResourceType type)
+ throws IOException {
+ LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ private static void writeConf(Configuration conf, Path queryConfFile)
+ throws IOException {
+ // Write job file to Tajo's fs
+ FileSystem fs = queryConfFile.getFileSystem(conf);
+ FSDataOutputStream out =
+ FileSystem.create(fs, queryConfFile,
+ new FsPermission(QUERYCONF_FILE_PERMISSION));
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ @Override
+ public synchronized void stopContainer() {
+
+ if(isCompletelyDone()) {
+ return;
+ }
+ if(this.state == ContainerState.PREP) {
+ this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+ } else {
+ LOG.info("KILLING " + containerID);
+
+ ContainerManagementProtocol proxy = null;
+ try {
+ proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+ this.containerToken);
+
+ // kill the remote container if already launched
+ List<ContainerId> willBeStopedIds = new ArrayList<ContainerId>();
+ willBeStopedIds.add(this.containerID);
+ StopContainersRequest stopRequests = Records.newRecord(StopContainersRequest.class);
+ stopRequests.setContainerIds(willBeStopedIds);
+ proxy.stopContainers(stopRequests);
+ // If stopContainer returns without an error, assuming the stop made
+ // it over to the NodeManager.
+// context.getEventHandler().handle(
+// new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+ context.getResourceAllocator().removeContainer(containerID);
+ } catch (Throwable t) {
+
+ // ignore the cleanup failure
+ String message = "cleanup failed for container "
+ + this.containerID + " : "
+ + StringUtils.stringifyException(t);
+// context.getEventHandler().handle(
+// new AMContainerEventStopFailed(containerID, message));
+ LOG.warn(message);
+ this.state = ContainerState.DONE;
+ return;
+ } finally {
+ if (proxy != null) {
+ yarnRPC.stopProxy(proxy, conf);
+ }
+ }
+ this.state = ContainerState.DONE;
+ }
+ }
+
+ protected Vector<CharSequence> getTaskParams() {
+ String queryMasterHost = context.getQueryMasterContext().getWorkerContext()
+ .getTajoWorkerManagerService().getBindAddr().getHostName();
+ int queryMasterPort = context.getQueryMasterContext().getWorkerContext()
+ .getTajoWorkerManagerService().getBindAddr().getPort();
+
+ Vector<CharSequence> taskParams = new Vector<CharSequence>();
+ taskParams.add(queryMasterHost); // queryMaster hostname
+ taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+ taskParams.add(context.getStagingDir().toString());
+ return taskParams;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
new file mode 100644
index 0000000..8b18b5a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
+
+ /** Class Logger */
+ private static final Log LOG = LogFactory.getLog(YarnTaskRunnerLauncherImpl.class);
+ //private final YarnRPC yarnRPC;
+ private final static RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+ private QueryMasterTask.QueryMasterTaskContext context;
+
+ // For ContainerLauncherSpec
+ private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+ private static String initialClasspath = null;
+ private static final Object classpathLock = new Object();
+ private ContainerLaunchContext commonContainerSpec = null;
+
+ final public static FsPermission QUERYCONF_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ /** for launching TaskRunners in parallel */
+ private final ExecutorService executorService;
+
+ private YarnRPC yarnRPC;
+
+ public YarnTaskRunnerLauncherImpl(QueryMasterTask.QueryMasterTaskContext context, YarnRPC yarnRPC) {
+ super(YarnTaskRunnerLauncherImpl.class.getName());
+ this.context = context;
+ this.yarnRPC = yarnRPC;
+ executorService = Executors.newFixedThreadPool(
+ context.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ }
+
+ public void start() {
+ super.start();
+ }
+
+ public void stop() {
+ executorService.shutdownNow();
+
+ Map<ContainerId, ContainerProxy> containers = context.getResourceAllocator().getContainers();
+ List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
+ for(ContainerProxy eachProxy: list) {
+ try {
+ eachProxy.stopContainer();
+ } catch (Exception e) {
+ }
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(TaskRunnerGroupEvent event) {
+ if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+ launchTaskRunners(event.executionBlockId, event.getContainers());
+ } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
+ stopTaskRunners(event.getContainers());
+ }
+ }
+
+ private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+ commonContainerSpec = YarnContainerProxy.createCommonContainerLaunchContext(getConfig(),
+ executionBlockId.getQueryId().toString(), false);
+ for (Container container : containers) {
+ final ContainerProxy proxy = new YarnContainerProxy(context, getConfig(),
+ yarnRPC, container, executionBlockId);
+ executorService.submit(new LaunchRunner(container.getId(), proxy));
+ }
+ }
+
+ protected class LaunchRunner implements Runnable {
+ private final ContainerProxy proxy;
+ private final ContainerId id;
+ public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+ this.proxy = proxy;
+ this.id = id;
+ }
+ @Override
+ public void run() {
+ proxy.launch(commonContainerSpec);
+ LOG.info("ContainerProxy started:" + id);
+ }
+ }
+
+ private void stopTaskRunners(Collection<Container> containers) {
+ for (Container container : containers) {
+ final ContainerProxy proxy = context.getResourceAllocator().getContainer(container.getId());
+ executorService.submit(new StopContainerRunner(container.getId(), proxy));
+ }
+ }
+
+ private static class StopContainerRunner implements Runnable {
+ private final ContainerProxy proxy;
+ private final ContainerId id;
+ public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+ this.id = id;
+ this.proxy = proxy;
+ }
+
+ @Override
+ public void run() {
+ proxy.stopContainer();
+ LOG.info("ContainerProxy stopped:" + id);
+ }
+ }
+
+
+ /**
+ * Lock this on initialClasspath so that there is only one fork in the AM for
+ * getting the initial class-path. TODO: We already construct
+ * a parent CLC and use it for all the containers, so this should go away
+ * once the mr-generated-classpath stuff is gone.
+ */
+ private static String getInitialClasspath(Configuration conf) {
+ synchronized (classpathLock) {
+ if (initialClasspathFlag.get()) {
+ return initialClasspath;
+ }
+ Map<String, String> env = new HashMap<String, String>();
+
+ initialClasspath = env.get(Environment.CLASSPATH.name());
+ initialClasspathFlag.set(true);
+ return initialClasspath;
+ }
+ }
+
+// public class TaskRunnerContainerProxy extends ContainerProxy {
+// private final ExecutionBlockId executionBlockId;
+//
+// public TaskRunnerContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+// Container container, ExecutionBlockId executionBlockId) {
+// super(context, conf, yarnRPC, container);
+// this.executionBlockId = executionBlockId;
+// }
+//
+// @Override
+// protected void containerStarted() {
+// context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+// }
+//
+// @Override
+// protected String getId() {
+// return executionBlockId.toString();
+// }
+//
+// @Override
+// protected String getRunnerClass() {
+// return TaskRunner.class.getCanonicalSignature();
+// }
+//
+// @Override
+// protected Vector<CharSequence> getTaskParams() {
+// Vector<CharSequence> taskParams = new Vector<CharSequence>();
+// taskParams.add(queryMasterHost); // queryMaster hostname
+// taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+//
+// return taskParams;
+// }
+// }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java b/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java
new file mode 100644
index 0000000..028af65
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/cluster/ServerName.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.cluster;
+
+public class ServerName implements Comparable<ServerName> {
+ /**
+ * This character is used as separator between server hostname and port.
+ */
+ public static final String SERVERNAME_SEPARATOR = ":";
+
+ private final String serverName;
+ private final String hostname;
+ private final int port;
+
+
+ public ServerName(final String hostname, final int port) {
+ this.hostname = hostname;
+ this.port = port;
+ this.serverName = getServerName(hostname, port);
+ }
+
+ public ServerName(final String serverName) {
+ this(parseHostname(serverName), parsePort(serverName));
+ }
+
+ public static ServerName create(final String serverName) {
+ return new ServerName(serverName);
+ }
+
+ public static ServerName createWithDefaultPort(final String serverName,
+ final int defaultPort) {
+ if (serverName == null || serverName.length() <= 0) {
+ throw new IllegalArgumentException("Passed hostname is null or empty ("
+ + serverName + ")");
+ }
+ int index = serverName.indexOf(SERVERNAME_SEPARATOR);
+ if (index == -1) {
+ return new ServerName(parseHostname(serverName), defaultPort);
+ } else {
+ return new ServerName(parseHostname(serverName), parsePort(serverName));
+ }
+ }
+
+ public static String parseHostname(final String serverName) {
+ if (serverName == null || serverName.length() <= 0) {
+ throw new IllegalArgumentException("Passed hostname is null or empty ("
+ + serverName + ")");
+ }
+ int index = serverName.indexOf(SERVERNAME_SEPARATOR);
+ if (index == -1) { // if a port is missing, the index will be set to -1.
+ throw new IllegalArgumentException("Passed port is missing ("
+ + serverName + ")");
+ }
+ return serverName.substring(0, index);
+ }
+
+ public static int parsePort(final String serverName) {
+ String [] split = serverName.split(SERVERNAME_SEPARATOR);
+ return Integer.parseInt(split[1]);
+ }
+
+ @Override
+ public String toString() {
+ return getServerName();
+ }
+
+ public String getServerName() {
+ return serverName;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public static String getServerName(String hostName, int port) {
+ final StringBuilder name = new StringBuilder(hostName.length() + 4);
+ name.append(hostName);
+ name.append(SERVERNAME_SEPARATOR);
+ name.append(port);
+ return name.toString();
+ }
+
+ @Override
+ public int compareTo(ServerName other) {
+ int compare = this.getHostname().toLowerCase().
+ compareTo(other.getHostname().toLowerCase());
+ if (compare != 0) return compare;
+ return this.getPort() - other.getPort();
+ }
+
+ @Override
+ public int hashCode() {
+ return getServerName().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null) return false;
+ if (!(o instanceof ServerName)) return false;
+ return this.compareTo((ServerName)o) == 0;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
new file mode 100644
index 0000000..c3a9a59
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+
+public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEventType> {
+
+ private final ExecutionBlockId executionBlockId;
+ private final Priority priority;
+ private final Resource resource;
+ private final boolean isLeafQuery;
+ private final int requiredNum;
+ private final float progress;
+
+ public ContainerAllocationEvent(ContainerAllocatorEventType eventType,
+ ExecutionBlockId executionBlockId,
+ Priority priority,
+ Resource resource,
+ int requiredNum,
+ boolean isLeafQuery, float progress) {
+ super(eventType);
+ this.executionBlockId = executionBlockId;
+ this.priority = priority;
+ this.resource = resource;
+ this.requiredNum = requiredNum;
+ this.isLeafQuery = isLeafQuery;
+ this.progress = progress;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public int getRequiredNum() {
+ return requiredNum;
+ }
+
+ public boolean isLeafQuery() {
+ return isLeafQuery;
+ }
+
+ public Resource getCapability() {
+ return resource;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public Resource getResource() {
+ return resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
new file mode 100644
index 0000000..4d10efe
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+public enum ContainerAllocatorEventType {
+ // producer: QueryUnitAttempt, consumer: ContainerAllocator
+ CONTAINER_REQ,
+ CONTAINER_DEALLOCATE,
+ CONTAINER_FAILED
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
new file mode 100644
index 0000000..723ac1a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.master.event.ContainerEvent.EventType;
+
+public class ContainerEvent extends AbstractEvent<EventType> {
+ public enum EventType {
+ CONTAINER_LAUNCHED,
+ CONTAINER_STOPPED
+ }
+
+ private final ContainerId cId;
+
+ public ContainerEvent(EventType eventType, ContainerId cId) {
+ super(eventType);
+ this.cId = cId;
+ }
+}