You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:02 UTC
[4/8] TAJO-91: Launch QueryMaster on NodeManager per query.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
new file mode 100644
index 0000000..871ba77
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.global.GlobalOptimizer;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.*;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.rm.RMContainerAllocator;
+import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.ProtoBlockingRpcClient;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+// TODO - when exception, send error status to QueryMasterManager
+public class QueryMaster extends CompositeService implements EventHandler {
+ private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
+ private static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+ private static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+ private static int QUERY_SESSION_TIMEOUT = 60 * 1000; //60 sec
+
+ // AppMaster Common
+ private final long appSubmitTime;
+ private Clock clock;
+
+ // For Query
+ private final QueryId queryId;
+ private QueryContext queryContext;
+ private Query query;
+ private TajoProtos.QueryState state = TajoProtos.QueryState.QUERY_NOT_ASSIGNED;
+ private String statusMessage;
+ private MasterPlan masterPlan;
+
+ private AsyncDispatcher dispatcher;
+ private RMContainerAllocator rmAllocator;
+
+ //service handler for QueryMasterManager, Worker
+ private QueryMasterService queryMasterService;
+ private QueryMasterClientService queryMasterClientService;
+
+ private TaskRunnerLauncher taskRunnerLauncher;
+ private GlobalPlanner globalPlanner;
+ private GlobalOptimizer globalOptimizer;
+
+ private boolean isCreateTableStmt;
+ private StorageManager storageManager;
+ private Path outputPath;
+ private QueryConf queryConf;
+ private ApplicationAttemptId appAttemptId;
+ private ApplicationId appId;
+ private ProtoBlockingRpcClient queryMasterManagerClient;
+ private QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface queryMasterManagerService;
+
+ private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+
+ private String queryMasterManagerAddress;
+
+ private YarnRPC yarnRPC;
+
+ private YarnClient yarnClient;
+
+ private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
+
+ public QueryMaster(final QueryId queryId, final long appSubmitTime, String queryMasterManagerAddress) {
+ super(QueryMaster.class.getName());
+
+ this.queryId = queryId;
+ this.appSubmitTime = appSubmitTime;
+ this.appId = queryId.getApplicationId();
+ this.queryMasterManagerAddress = queryMasterManagerAddress;
+
+ LOG.info("Created Query Master for " + queryId);
+ }
+
+ public void init(Configuration conf) {
+ try {
+ queryConf = new QueryConf(conf);
+ QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+ queryContext = new QueryContext(queryConf);
+ yarnRPC = YarnRPC.create(queryContext.getConf());
+ connectYarnClient();
+
+ LOG.info("Init QueryMasterManagerClient connection to:" + queryMasterManagerAddress);
+ InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterManagerAddress);
+ queryMasterManagerClient = new ProtoBlockingRpcClient(QueryMasterManagerProtocol.class, addr);
+ queryMasterManagerService = queryMasterManagerClient.getStub();
+
+ clock = new SystemClock();
+
+ this.dispatcher = new AsyncDispatcher();
+ addIfService(dispatcher);
+
+ this.storageManager = new StorageManager(queryConf);
+
+ globalPlanner = new GlobalPlanner(queryConf, storageManager, dispatcher.getEventHandler());
+ globalOptimizer = new GlobalOptimizer();
+
+ queryMasterService = new QueryMasterService();
+ addIfService(queryMasterService);
+
+ queryMasterClientService = new QueryMasterClientService(queryContext);
+ addIfService(queryMasterClientService);
+
+ initStagingDir();
+
+ dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
+ dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+ dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
+ dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+
+ clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
+
+ clientSessionTimeoutCheckThread.start();
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ throw new RuntimeException(t);
+ }
+ super.init(conf);
+ }
+
+ class ClientSessionTimeoutCheckThread extends Thread {
+ public void run() {
+ LOG.info("ClientSessionTimeoutCheckThread started");
+ while(true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ try {
+ long lastHeartbeat = queryContext.getLastClientHeartbeat();
+ long time = System.currentTimeMillis() - lastHeartbeat;
+ if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+ LOG.warn("Query " + queryId + " stopped cause query sesstion timeout: " + time + " ms");
+ QueryMaster.this.stop();
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ class QueryHeartbeatThread extends Thread {
+ public QueryHeartbeatThread() {
+ super("QueryHeartbeatThread");
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Start QueryMaster heartbeat thread");
+ while(queryMasterManagerClient.isConnected()) {
+ QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat =
+ QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
+ .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
+ .setQueryMasterPort(queryMasterService.bindAddr.getPort())
+ .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
+ .setState(state)
+ .setQueryId(queryId.getProto())
+ .build();
+
+ try {
+ QueryMasterManagerProtocol.QueryHeartbeatResponse response =
+ queryMasterManagerService.queryHeartbeat(null, queryHeartbeat);
+ if(response.getResponseCommand() != null) {
+ if("executeQuery".equals(response.getResponseCommand().getCommand())) {
+ appAttemptId = TajoIdUtils.toApplicationAttemptId(response.getResponseCommand().getParams(0));
+ startQuery(response.getResponseCommand().getParams(1),
+ response.getResponseCommand().getParams(2));
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ LOG.info("QueryMaster heartbeat thread stopped");
+ }
+ }
+
+ // TODO blocking/nonblocking ???
+ class QueryMasterService extends AbstractService implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
+ private ProtoAsyncRpcServer rpcServer;
+ private InetSocketAddress bindAddr;
+ private String addr;
+ private QueryHeartbeatThread queryHeartbeatThread;
+
+ public QueryMasterService() {
+ super(QueryMasterService.class.getName());
+
+ // Setup RPC server
+ try {
+ InetSocketAddress initIsa =
+ new InetSocketAddress(InetAddress.getLocalHost(), 0);
+ if (initIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initIsa);
+ }
+
+ this.rpcServer = new ProtoAsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
+ this.rpcServer.start();
+
+ this.bindAddr = rpcServer.getBindAddress();
+ this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // Get the master address
+ LOG.info(QueryMasterService.class.getSimpleName() + " is bind to " + addr);
+ queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ try {
+ queryHeartbeatThread = new QueryHeartbeatThread();
+ queryHeartbeatThread.start();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ // TODO - set query status failed and stop QueryMaster
+ }
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(rpcServer != null) {
+ rpcServer.shutdown();
+ }
+ if(queryHeartbeatThread != null) {
+ queryHeartbeatThread.interrupt();
+ }
+ if(yarnClient != null) {
+ yarnClient.stop();
+ }
+ if(clientSessionTimeoutCheckThread != null) {
+ clientSessionTimeoutCheckThread.interrupt();
+ }
+ LOG.info("QueryMasterService stopped");
+ super.stop();
+ }
+
+ @Override
+ public void getTask(RpcController controller, YarnProtos.ContainerIdProto request,
+ RpcCallback<QueryMasterProtocol.QueryUnitRequestProto> done) {
+ queryContext.getEventHandler().handle(new TaskRequestEvent(new ContainerIdPBImpl(request), done));
+ }
+
+ @Override
+ public void statusUpdate(RpcController controller, QueryMasterProtocol.TaskStatusProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+ queryContext.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId, request));
+ done.run(TRUE_PROTO);
+ }
+
+ @Override
+ public void ping(RpcController controller,
+ TajoIdProtos.QueryUnitAttemptIdProto attemptIdProto,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ // TODO - to be completed
+ QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
+ done.run(TRUE_PROTO);
+ }
+
+ @Override
+ public void fatalError(RpcController controller, QueryMasterProtocol.TaskFatalErrorReport report,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ queryContext.getEventHandler().handle(new TaskFatalErrorEvent(report));
+ done.run(TRUE_PROTO);
+ }
+
+ @Override
+ public void done(RpcController controller, QueryMasterProtocol.TaskCompletionReport report,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ queryContext.getEventHandler().handle(new TaskCompletionEvent(report));
+ done.run(TRUE_PROTO);
+ }
+
+ @Override
+ public void executeQuery(RpcController controller, PrimitiveProtos.StringProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ }
+ }
+
+ public void start() {
+ super.start();
+ }
+
+ public void stop() {
+ LOG.info("unregisterApplicationMaster");
+ if(rmAllocator != null) {
+ try {
+ FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
+ if (query != null) {
+ TajoProtos.QueryState state = query.getState();
+ if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ status = FinalApplicationStatus.SUCCEEDED;
+ } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
+ status = FinalApplicationStatus.FAILED;
+ } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
+ status = FinalApplicationStatus.FAILED;
+ }
+ }
+ this.rmAllocator.unregisterApplicationMaster(status, "tajo query finished", null);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ // TODO - release opened resource
+ if(this.queryMasterManagerClient != null) {
+ reportQueryStatus();
+
+ queryMasterManagerClient.close();
+ }
+
+ try {
+ FileSystem.closeAll();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ super.stop();
+
+ synchronized(queryId) {
+ queryId.notifyAll();
+ }
+ }
+
+ private void reportQueryStatus() {
+ //send query status heartbeat
+ QueryMasterManagerProtocol.QueryHeartbeat.Builder queryHeartbeatBuilder =
+ QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
+ .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
+ .setQueryMasterPort(queryMasterService.bindAddr.getPort())
+ .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
+ .setState(state)
+ .setQueryId(queryId.getProto());
+
+ if(statusMessage != null) {
+ queryHeartbeatBuilder.setStatusMessage(statusMessage);
+ }
+ try {
+ queryMasterManagerService.queryHeartbeat(null, queryHeartbeatBuilder.build());
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private void connectYarnClient() {
+ this.yarnClient = new YarnClientImpl();
+ this.yarnClient.init(queryConf);
+ this.yarnClient.start();
+ }
+
+ protected void addIfService(Object object) {
+ if (object instanceof Service) {
+ addService((Service) object);
+ }
+ }
+
+ public synchronized void startQuery(String queryStr, String planJSON) {
+ LOG.info("Query Start:" + queryStr);
+ LOG.info("Plan JSON:" + planJSON);
+ if(query != null) {
+ LOG.warn("Query already started");
+ return;
+ }
+
+ try {
+ LogicalRootNode logicalNodeRoot = (LogicalRootNode) CoreGsonHelper.fromJson(planJSON, LogicalNode.class);
+ LogicalNode[] scanNodes = PlannerUtil.findAllNodes(logicalNodeRoot, ExprType.SCAN);
+ if(scanNodes != null) {
+ for(LogicalNode eachScanNode: scanNodes) {
+ ScanNode scanNode = (ScanNode)eachScanNode;
+ tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
+ }
+ }
+ MasterPlan globalPlan = globalPlanner.build(queryId, logicalNodeRoot);
+ this.masterPlan = globalOptimizer.optimize(globalPlan);
+
+ taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
+ addIfService(taskRunnerLauncher);
+ dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+
+ ((TaskRunnerLauncherImpl)taskRunnerLauncher).init(queryConf);
+ ((TaskRunnerLauncherImpl)taskRunnerLauncher).start();
+
+ rmAllocator = new RMContainerAllocator(queryContext);
+ addIfService(rmAllocator);
+ dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
+
+ rmAllocator.init(queryConf);
+ rmAllocator.start();
+
+ //TODO - synchronized with executeQuery logic
+ query = new Query(queryContext, queryId, clock, appSubmitTime,
+ "", dispatcher.getEventHandler(), masterPlan, storageManager);
+ dispatcher.register(QueryEventType.class, query);
+
+ dispatcher.getEventHandler().handle(new QueryEvent(queryId,
+ QueryEventType.INIT));
+ dispatcher.getEventHandler().handle(new QueryEvent(queryId,
+ QueryEventType.START));
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ //send FAIL query status
+ this.statusMessage = StringUtils.stringifyException(e);
+ this.state = TajoProtos.QueryState.QUERY_FAILED;
+ }
+ }
+
+ @Override
+ public void handle(Event event) {
+ dispatcher.getEventHandler().handle(event);
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
+ public void handle(SubQueryEvent event) {
+ SubQueryId id = event.getSubQueryId();
+ query.getSubQuery(id).handle(event);
+ }
+ }
+
+ private class TaskEventDispatcher
+ implements EventHandler<TaskEvent> {
+ public void handle(TaskEvent event) {
+ QueryUnitId taskId = event.getTaskId();
+ QueryUnit task = query.getSubQuery(taskId.getSubQueryId()).
+ getQueryUnit(taskId);
+ task.handle(event);
+ }
+ }
+
+ private class TaskAttemptEventDispatcher
+ implements EventHandler<TaskAttemptEvent> {
+ public void handle(TaskAttemptEvent event) {
+ QueryUnitAttemptId attemptId = event.getTaskAttemptId();
+ SubQuery subQuery = query.getSubQuery(attemptId.getSubQueryId());
+ QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+ QueryUnitAttempt attempt = task.getAttempt(attemptId);
+ attempt.handle(event);
+ }
+ }
+
+ private class TaskSchedulerDispatcher
+ implements EventHandler<TaskSchedulerEvent> {
+ public void handle(TaskSchedulerEvent event) {
+ SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
+ subQuery.getTaskScheduler().handle(event);
+ }
+ }
+
+ public QueryContext getContext() {
+ return this.queryContext;
+ }
+
+ public class QueryContext {
+ private QueryConf conf;
+ public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
+ int minCapability;
+ int maxCapability;
+ int numCluster;
+ AtomicLong lastClientHeartbeat = new AtomicLong(-1);
+
+ public QueryContext(QueryConf conf) {
+ this.conf = conf;
+ }
+
+ public QueryConf getConf() {
+ return conf;
+ }
+
+ public InetSocketAddress getQueryMasterServiceAddress() {
+ return queryMasterService.bindAddr;
+ }
+
+ public QueryMasterClientService getQueryMasterClientService() {
+ return queryMasterClientService;
+ }
+
+ public AsyncDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public Clock getClock() {
+ return clock;
+ }
+
+ public Query getQuery() {
+ return query;
+ }
+
+ public SubQuery getSubQuery(SubQueryId subQueryId) {
+ return query.getSubQuery(subQueryId);
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appAttemptId;
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ public void addContainer(ContainerId cId, ContainerProxy container) {
+ containers.put(cId, container);
+ }
+
+ public void removeContainer(ContainerId cId) {
+ containers.remove(cId);
+ }
+
+ public boolean containsContainer(ContainerId cId) {
+ return containers.containsKey(cId);
+ }
+
+ public ContainerProxy getContainer(ContainerId cId) {
+ return containers.get(cId);
+ }
+
+ public Map<ContainerId, ContainerProxy> getContainers() {
+ return containers;
+ }
+
+ public int getNumClusterNode() {
+ return numCluster;
+ }
+
+ public void setNumClusterNodes(int num) {
+ numCluster = num;
+ }
+
+// public CatalogService getCatalog() {
+// return catalog;
+// }
+
+ public Map<String, TableDesc> getTableDescMap() {
+ return tableDescMap;
+ }
+
+ public Path getOutputPath() {
+ return outputPath;
+ }
+
+ public void setMaxContainerCapability(int capability) {
+ this.maxCapability = capability;
+ }
+
+ public int getMaxContainerCapability() {
+ return this.maxCapability;
+ }
+
+ public void setMinContainerCapability(int capability) {
+ this.minCapability = capability;
+ }
+
+ public int getMinContainerCapability() {
+ return this.minCapability;
+ }
+
+ public boolean isCreateTableQuery() {
+ return isCreateTableStmt;
+ }
+
+ public float getProgress() {
+ if(query != null) {
+ return query.getProgress();
+ } else {
+ return 0;
+ }
+ }
+
+ public long getStartTime() {
+ if(query != null) {
+ return query.getStartTime();
+ } else {
+ return -1;
+ }
+ }
+
+ public long getFinishTime() {
+ if(query != null) {
+ return query.getFinishTime();
+ } else {
+ return -1;
+ }
+ }
+
+ public StorageManager getStorageManager() {
+ return storageManager;
+ }
+
+ public QueryMaster getQueryMaster() {
+ return QueryMaster.this;
+ }
+
+ public YarnRPC getYarnRPC() {
+ return yarnRPC;
+ }
+
+ public void setState(TajoProtos.QueryState state) {
+ QueryMaster.this.state = state;
+ }
+
+ public TajoProtos.QueryState getState() {
+ return state;
+ }
+
+ public void touchSessionTime() {
+ this.lastClientHeartbeat.set(System.currentTimeMillis());
+ }
+
+ public long getLastClientHeartbeat() {
+ return this.lastClientHeartbeat.get();
+ }
+ }
+
+ private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+ @Override
+ public void handle(QueryFinishEvent event) {
+ LOG.info("Query end notification started for QueryId : " + query.getId() + "," + query.getState());
+
+ //QueryMaster must be lived until client fetching all query result data.
+ try {
+ // Stop all services
+ // This will also send the final report to the ResourceManager
+ //LOG.info("Calling stop for all the services");
+// stop();
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
+
+ //Bring the process down by force.
+ //Not needed after HADOOP-7140
+ //LOG.info("Exiting QueryMaster..GoodBye!");
+ }
+ }
+
+ // query submission directory is private!
+ final public static FsPermission USER_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+
+ /**
+ * It initializes the final output and staging directory and sets
+ * them to variables.
+ */
+ private void initStagingDir() throws IOException {
+ QueryConf conf = getContext().getConf();
+
+ String realUser;
+ String currentUser;
+ UserGroupInformation ugi;
+ ugi = UserGroupInformation.getLoginUser();
+ realUser = ugi.getShortUserName();
+ currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ String givenOutputTableName = conf.getOutputTable();
+ Path stagingDir;
+
+ // If final output directory is not given by an user,
+ // we use the query id as a output directory.
+ if (givenOutputTableName.equals("")) {
+ this.isCreateTableStmt = false;
+ FileSystem defaultFS = FileSystem.get(conf);
+
+ Path homeDirectory = defaultFS.getHomeDirectory();
+ if (!defaultFS.exists(homeDirectory)) {
+ defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
+ }
+
+ Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
+
+ if (defaultFS.exists(userQueryDir)) {
+ FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
+ String owner = fsStatus.getOwner();
+
+ if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+ throw new IOException("The ownership on the user's query " +
+ "directory " + userQueryDir + " is not as expected. " +
+ "It is owned by " + owner + ". The directory must " +
+ "be owned by the submitter " + currentUser + " or " +
+ "by " + realUser);
+ }
+
+ if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
+ LOG.info("Permissions on staging directory " + userQueryDir + " are " +
+ "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+ "to correct value " + USER_DIR_PERMISSION);
+ defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
+ }
+ } else {
+ defaultFS.mkdirs(userQueryDir,
+ new FsPermission(USER_DIR_PERMISSION));
+ }
+
+ stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
+
+ if (defaultFS.exists(stagingDir)) {
+ throw new IOException("The staging directory " + stagingDir
+ + "already exists. The directory must be unique to each query");
+ } else {
+ defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+ }
+
+ // Set the query id to the output table name
+ conf.setOutputTable(queryId.toString());
+
+ } else {
+ this.isCreateTableStmt = true;
+ Path warehouseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR),
+ TajoConstants.WAREHOUSE_DIR);
+ stagingDir = new Path(warehouseDir, conf.getOutputTable());
+
+ FileSystem fs = warehouseDir.getFileSystem(conf);
+ if (fs.exists(stagingDir)) {
+ throw new IOException("The staging directory " + stagingDir
+ + " already exists. The directory must be unique to each query");
+ } else {
+ // TODO - should have appropriate permission
+ fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+ }
+ }
+
+ conf.setOutputPath(stagingDir);
+ outputPath = stagingDir;
+ LOG.info("Initialized Query Staging Dir: " + outputPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
new file mode 100644
index 0000000..1a326fe
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+public class QueryMasterClientService extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(QueryMasterClientService.class);
+ private final PrimitiveProtos.BoolProto BOOL_TRUE =
+ PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+
+ private ProtoBlockingRpcServer rpcServer;
+ private InetSocketAddress bindAddr;
+ private String addr;
+ private QueryMaster.QueryContext queryContext;
+ private QueryMasterClientProtocolServiceHandler serviceHandler;
+
+ public QueryMasterClientService(QueryMaster.QueryContext queryContext) {
+ super(QueryMasterClientService.class.getName());
+
+ this.queryContext = queryContext;
+ this.serviceHandler = new QueryMasterClientProtocolServiceHandler();
+
+ // init RPC Server in constructor cause Heartbeat Thread use bindAddr
+ // Setup RPC server
+ try {
+ // TODO initial port num is value of config and find unused port with sequence
+ InetSocketAddress initIsa = new InetSocketAddress(InetAddress.getLocalHost(), 0);
+ if (initIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initIsa);
+ }
+
+ // TODO blocking/non-blocking??
+ this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
+ this.rpcServer.start();
+
+ this.bindAddr = rpcServer.getBindAddress();
+ this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // Get the master address
+ LOG.info(QueryMasterClientService.class.getSimpleName() + " is bind to " + addr);
+ //queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(rpcServer != null) {
+ rpcServer.shutdown();
+ }
+ LOG.info("QueryMasterClientService stopped");
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddr() {
+ return bindAddr;
+ }
+
+
+ public class QueryMasterClientProtocolServiceHandler
+ implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
+ @Override
+ public PrimitiveProtos.BoolProto updateSessionVariables(
+ RpcController controller,
+ ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public ClientProtos.GetQueryResultResponse getQueryResult(
+ RpcController controller,
+ ClientProtos.GetQueryResultRequest request) throws ServiceException {
+ QueryId queryId = new QueryId(request.getQueryId());
+ Query query = queryContext.getQuery();
+
+ ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
+
+ if(query == null) {
+ builder.setErrorMessage("No Query for " + queryId);
+ } else {
+ switch (query.getState()) {
+ case QUERY_SUCCEEDED:
+ builder.setTableDesc((CatalogProtos.TableDescProto)query.getResultDesc().getProto());
+ break;
+ case QUERY_FAILED:
+ case QUERY_ERROR:
+ builder.setErrorMessage("Query " + queryId + " is failed");
+ default:
+ builder.setErrorMessage("Query " + queryId + " is still running");
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ClientProtos.GetQueryStatusResponse getQueryStatus(
+ RpcController controller,
+ ClientProtos.GetQueryStatusRequest request) throws ServiceException {
+ ClientProtos.GetQueryStatusResponse.Builder builder
+ = ClientProtos.GetQueryStatusResponse.newBuilder();
+ QueryId queryId = new QueryId(request.getQueryId());
+ builder.setQueryId(request.getQueryId());
+
+ if (queryId.equals(TajoIdUtils.NullQueryId)) {
+ builder.setResultCode(ClientProtos.ResultCode.OK);
+ builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+ } else {
+ Query query = queryContext.getQuery();
+ builder.setResultCode(ClientProtos.ResultCode.OK);
+ builder.setQueryMasterHost(queryContext.getQueryMasterClientService().getBindAddr().getHostName());
+ builder.setQueryMasterPort(queryContext.getQueryMasterClientService().getBindAddr().getPort());
+
+
+ queryContext.touchSessionTime();
+ if (query != null) {
+ builder.setState(query.getState());
+ builder.setProgress(query.getProgress());
+ builder.setSubmitTime(query.getAppSubmitTime());
+ builder.setInitTime(query.getInitializationTime());
+ builder.setHasResult(!query.isCreateTableStmt());
+ if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ builder.setFinishTime(query.getFinishTime());
+ } else {
+ builder.setFinishTime(System.currentTimeMillis());
+ }
+ } else {
+ builder.setState(queryContext.getState());
+ }
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public PrimitiveProtos.BoolProto killQuery(
+ RpcController controller,
+ YarnProtos.ApplicationAttemptIdProto request) throws ServiceException {
+ LOG.info("Stop QueryMaster:" + queryContext.getQueryId());
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(1000); //wait tile return to rpc response
+ } catch (InterruptedException e) {
+ }
+ queryContext.getQueryMaster().stop();
+ }
+ };
+ t.start();
+ return BOOL_TRUE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
new file mode 100644
index 0000000..47adf7d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
+import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.TajoMaster;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+// TODO - check QueryMaster status and if QueryMaster failed, release resource
+public class QueryMasterManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryMasterManager.class.getName());
+
+ // Master Context
+ private final TajoMaster.MasterContext masterContext;
+
+ // AppMaster Common
+ private final Clock clock;
+ private final long appSubmitTime;
+ private final ApplicationId appId;
+ private ApplicationAttemptId appAttemptId;
+
+ protected YarnClient yarnClient;
+
+ // For Query
+ private final QueryId queryId;
+
+ private AsyncDispatcher dispatcher;
+ private YarnRPC rpc;
+
+ private TajoProtos.QueryState state;
+ private float progress;
+ private long finishTime;
+ private TableDesc resultDesc;
+ private String queryMasterHost;
+ private int queryMasterPort;
+ private int queryMasterClientPort;
+
+ private LogicalRootNode plan;
+
+ private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+ private AtomicBoolean queryMasterStopped = new AtomicBoolean(true);
+
+ private boolean stopCheckThreadStarted = false;
+
+ private String query;
+
+ public QueryMasterManager(final TajoMaster.MasterContext masterContext,
+ final YarnClient yarnClient,
+ final QueryId queryId,
+ final String query,
+ final LogicalRootNode plan,
+ final ApplicationId appId,
+ final Clock clock, long appSubmitTime) {
+ super(QueryMasterManager.class.getName());
+ this.masterContext = masterContext;
+ this.yarnClient = yarnClient;
+
+ this.appId = appId;
+ this.clock = clock;
+ this.appSubmitTime = appSubmitTime;
+ this.queryId = queryId;
+ this.plan = plan;
+ this.query = query;
+ LOG.info("Created Query Master Manager for AppId=" + appId + ", QueryID=" + queryId);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+
+ state = TajoProtos.QueryState.QUERY_MASTER_INIT;
+ }
+
+ public TajoProtos.QueryState getState() {
+ return state;
+ }
+
+ @Override
+ public void start() {
+ try {
+ appAttemptId = allocateAndLaunchQueryMaster();
+ } catch (YarnRemoteException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ while(true) {
+ if(queryMasterStopped.get()) {
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ LOG.info("QueryMasterManager for " + queryId + " stopped");
+ super.stop();
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public long getAppSubmitTime() {
+ return appSubmitTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public TableDesc getResultDesc() {
+ return resultDesc;
+ }
+
+ public String getQueryMasterHost() {
+ return queryMasterHost;
+ }
+
+ public int getQueryMasterPort() {
+ return queryMasterPort;
+ }
+
+ public int getQueryMasterClientPort() {
+ return queryMasterClientPort;
+ }
+
+ public synchronized QueryHeartbeatResponse.ResponseCommand queryHeartbeat(QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat) {
+ this.queryMasterHost = queryHeartbeat.getQueryMasterHost();
+ this.queryMasterPort = queryHeartbeat.getQueryMasterPort();
+ this.queryMasterClientPort = queryHeartbeat.getQueryMasterClientPort();
+ this.state = queryHeartbeat.getState();
+ if(state == TajoProtos.QueryState.QUERY_FAILED) {
+ //TODO needed QueryMaster's detail status(failed before or after launching worker)
+ queryMasterStopped.set(true);
+ if(queryHeartbeat.getStatusMessage() != null) {
+ LOG.warn(queryId + " failed, " + queryHeartbeat.getStatusMessage());
+ }
+ }
+
+ if(!stopCheckThreadStarted && !queryMasterStopped.get() && isFinishState(this.state)) {
+ stopCheckThreadStarted = true;
+ startCheckingQueryMasterStop();
+ }
+ if(appAttemptId != null && !querySubmitted.get()) {
+ LOG.info("submitQuery to QueryMaster(" + queryMasterHost + ":" + queryMasterPort + ")");
+ queryMasterStopped.set(false);
+ querySubmitted.set(true);
+ List<String> params = new ArrayList<String>(3);
+ params.add(appAttemptId.toString());
+ params.add(query);
+ params.add(plan.toJson());
+ return QueryHeartbeatResponse.ResponseCommand.newBuilder()
+ .setCommand("executeQuery")
+ .addAllParams(params)
+ .build();
+ } else {
+ return null;
+ }
+ }
+
+ private boolean isFinishState(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_FAILED ||
+ state == TajoProtos.QueryState.QUERY_KILLED ||
+ state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+ }
+
+ private void startCheckingQueryMasterStop() {
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ ApplicationReport report = monitorApplication(appId,
+ EnumSet.of(
+ YarnApplicationState.FINISHED,
+ YarnApplicationState.KILLED,
+ YarnApplicationState.FAILED));
+ queryMasterStopped.set(true);
+ LOG.info("QueryMaster (" + queryId + ") stopped");
+ } catch (YarnRemoteException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ };
+
+ t.start();
+ }
+
+ private ApplicationAttemptId allocateAndLaunchQueryMaster() throws YarnRemoteException {
+ LOG.info("Allocate and launch QueryMaster:" + yarnClient);
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Tajo");
+
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(5);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+
+ ContainerLaunchContext commonContainerLaunchContext =
+ ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf());
+
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ //LOG.info("Setting up app master command");
+ vargs.add("${JAVA_HOME}" + "/bin/java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx2000m");
+ // Set Remote Debugging
+ //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
+ //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ //}
+ // Set class name
+ vargs.add(QueryMasterRunner.class.getCanonicalName());
+ vargs.add(queryId.toString()); // queryId
+ vargs.add(String.valueOf(appSubmitTime));
+ vargs.add(masterContext.getQueryMasterManagerService().getBindAddress().getHostName() + ":" +
+ masterContext.getQueryMasterManagerService().getBindAddress().getPort());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ final Resource resource = Records.newRecord(Resource.class);
+ // TODO - get default value from conf
+ resource.setMemory(2048);
+ resource.setVirtualCores(1);
+
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+
+ ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
+ null, commonContainerLaunchContext.getUser(),
+ resource, commonContainerLaunchContext.getLocalResources(), myEnv, commands,
+ myServiceData, null, new HashMap<ApplicationAccessType, String>(2));
+
+ appContext.setAMContainerSpec(masterContainerContext);
+
+ LOG.info("Submitting QueryMaster to ResourceManager");
+ yarnClient.submitApplication(appContext);
+
+ ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
+ ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+
+ LOG.info("Launching QueryMaster with id: " + attemptId);
+
+ state = TajoProtos.QueryState.QUERY_MASTER_LAUNCHED;
+
+ return attemptId;
+ }
+
+ private ApplicationReport monitorApplication(ApplicationId appId,
+ Set<YarnApplicationState> finalState) throws YarnRemoteException {
+
+ long sleepTime = 100;
+ int count = 1;
+ while (true) {
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+ LOG.info("Got application report from ASM for" + ", appId="
+ + appId.getId() + ", appAttemptId="
+ + report.getCurrentApplicationAttemptId() + ", clientToken="
+ + report.getClientToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+ + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+ + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+ + ", yarnAppState=" + report.getYarnApplicationState().toString()
+ + ", distributedFinalState="
+ + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+ + report.getTrackingUrl() + ", appUser=" + report.getUser());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (finalState.contains(state)) {
+ return report;
+ }
+ try {
+ Thread.sleep(sleepTime);
+ sleepTime = count * 100;
+ if(count < 10) {
+ count++;
+ }
+ } catch (InterruptedException e) {
+ //LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
new file mode 100644
index 0000000..a3c7b75
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeat;
+import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+
+import java.net.InetSocketAddress;
+
+public class QueryMasterManagerService extends AbstractService {
+ private final static Log LOG = LogFactory.getLog(QueryMasterManagerService.class);
+
+ private final TajoMaster.MasterContext context;
+ private final TajoConf conf;
+ private final QueryMasterManagerProtocolServiceHandler masterHandler;
+ private ProtoBlockingRpcServer server;
+ private InetSocketAddress bindAddress;
+
+ private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+ private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+ public QueryMasterManagerService(TajoMaster.MasterContext context) {
+ super(QueryMasterManagerService.class.getName());
+ this.context = context;
+ this.conf = context.getConf();
+ this.masterHandler = new QueryMasterManagerProtocolServiceHandler();
+ }
+
+ @Override
+ public void start() {
+ // TODO resolve hostname
+ String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ try {
+ server = new ProtoBlockingRpcServer(QueryMasterManagerProtocol.class, masterHandler, initIsa);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ server.start();
+ bindAddress = server.getBindAddress();
+ this.conf.setVar(TajoConf.ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS,
+ org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
+ LOG.info("Instantiated QueryMasterManagerService at " + this.bindAddress);
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(server != null) {
+ server.shutdown();
+ server = null;
+ }
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ public class QueryMasterManagerProtocolServiceHandler implements QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface {
+ @Override
+ public QueryHeartbeatResponse queryHeartbeat(RpcController controller, QueryHeartbeat request) throws ServiceException {
+ // TODO - separate QueryMasterManagerProtocol, ClientServiceProtocol
+ QueryId queryId = new QueryId(request.getQueryId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received QueryHeartbeat:" + queryId + "," + request);
+ }
+ QueryMasterManager queryMasterManager = context.getQuery(queryId);
+ if (queryMasterManager == null) {
+ LOG.warn("No query:" + queryId);
+ return QueryHeartbeatResponse.newBuilder().setHeartbeatResult(BOOL_FALSE).build();
+ }
+
+ QueryHeartbeatResponse.ResponseCommand command = queryMasterManager.queryHeartbeat(request);
+
+ //ApplicationAttemptId attemptId = queryMasterManager.getAppAttemptId();
+ //String attemptIdStr = attemptId == null ? null : attemptId.toString();
+ QueryHeartbeatResponse.Builder builder = QueryHeartbeatResponse.newBuilder();
+ builder.setHeartbeatResult(BOOL_TRUE);
+ if(command != null) {
+ builder.setResponseCommand(command);
+ }
+ return builder.build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
new file mode 100644
index 0000000..f34464b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+public class QueryMasterRunner extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
+ private QueryConf queryConf;
+ private QueryMaster queryMaster;
+ private QueryId queryId;
+ private long appSubmitTime;
+ private String queryMasterManagerAddress;
+
+ public QueryMasterRunner(QueryId queryId, long appSubmitTime, String queryMasterManagerAddress) {
+ super(QueryMasterRunner.class.getName());
+ this.queryId = queryId;
+ this.appSubmitTime = appSubmitTime;
+ this.queryMasterManagerAddress = queryMasterManagerAddress;
+ }
+
+ private class ShutdownHook implements Runnable {
+ @Override
+ public void run() {
+ LOG.info("============================================");
+ LOG.info("QueryMaster received SIGINT Signal");
+ LOG.info("============================================");
+ stop();
+ }
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.queryConf = (QueryConf)conf;
+ RackResolver.init(queryConf);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ //create QueryMaster
+ QueryMaster query = new QueryMaster(queryId, appSubmitTime, queryMasterManagerAddress);
+
+ query.init(queryConf);
+ query.start();
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("QueryMasterRunner started");
+
+ final QueryConf conf = new QueryConf();
+ conf.addResource(new Path(QueryConf.FILENAME));
+
+ UserGroupInformation.setConfiguration(conf);
+
+ final QueryId queryId = TajoIdUtils.createQueryId(args[0]);
+ final long appSubmitTime = Long.parseLong(args[1]);
+ final String queryMasterManagerAddr = args[2];
+
+ LOG.info("Received QueryId:" + queryId);
+
+ QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, appSubmitTime, queryMasterManagerAddr);
+ queryMasterRunner.init(conf);
+ queryMasterRunner.start();
+
+ synchronized(queryId) {
+ queryId.wait();
+ }
+
+ System.exit(0);
+ }
+
+ public static void printThreadInfo(PrintWriter stream, String title) {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ final int STACK_DEPTH = 60;
+ boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+ long[] threadIds = threadBean.getAllThreadIds();
+ stream.println("Process Thread Dump: " + title);
+ stream.println(threadIds.length + " active threads");
+ for (long tid : threadIds) {
+ ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+ if (info == null) {
+ stream.println(" Inactive");
+ continue;
+ }
+ stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
+ Thread.State state = info.getThreadState();
+ stream.println(" State: " + state);
+ stream.println(" Blocked count: " + info.getBlockedCount());
+ stream.println(" Waited count: " + info.getWaitedCount());
+ if (contention) {
+ stream.println(" Blocked time: " + info.getBlockedTime());
+ stream.println(" Waited time: " + info.getWaitedTime());
+ }
+ if (state == Thread.State.WAITING) {
+ stream.println(" Waiting on " + info.getLockName());
+ } else if (state == Thread.State.BLOCKED) {
+ stream.println(" Blocked on " + info.getLockName());
+ stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
+ }
+ stream.println(" Stack:");
+ for (StackTraceElement frame : info.getStackTrace()) {
+ stream.println(" " + frame.toString());
+ }
+ }
+ stream.flush();
+ }
+
+ private static String getTaskName(long id, String name) {
+ if (name == null) {
+ return Long.toString(id);
+ }
+ return id + " (" + name + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
new file mode 100644
index 0000000..fcb8f3e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.ipc.QueryMasterProtocol.Partition;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.Fragment;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QueryUnit implements EventHandler<TaskEvent> {
+ /** Class Logger */
+ private static final Log LOG = LogFactory.getLog(QueryUnit.class);
+
+ private QueryUnitId taskId;
+ private EventHandler eventHandler;
+ private StoreTableNode store = null;
+ private LogicalNode plan = null;
+ private List<ScanNode> scan;
+
+ private Map<String, Fragment> fragMap;
+ private Map<String, Set<URI>> fetchMap;
+
+ private List<Partition> partitions;
+ private TableStat stats;
+ private String [] dataLocations;
+ private final boolean isLeafTask;
+ private List<IntermediateEntry> intermediateData;
+
+ private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
+ private final int maxAttempts = 3;
+ private Integer lastAttemptId;
+
+ private QueryUnitAttemptId successfulAttempt;
+ private String succeededHost;
+ private int succeededPullServerPort;
+
+ private int failedAttempts;
+ private int finishedAttempts; // finish are total of success, failed and killed
+
+ private static final StateMachineFactory
+ <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
+ new StateMachineFactory
+ <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+ .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+ TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
+
+ .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED)
+
+ .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED)
+
+ .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
+
+ .addTransition(TaskState.RUNNING,
+ EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
+
+
+
+ .installTopology();
+ private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
+
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ public QueryUnit(QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
+ this.taskId = id;
+ this.eventHandler = eventHandler;
+ this.isLeafTask = isLeafTask;
+ scan = new ArrayList<ScanNode>();
+ fetchMap = Maps.newHashMap();
+ fragMap = Maps.newHashMap();
+ partitions = new ArrayList<Partition>();
+ attempts = Collections.emptyMap();
+ lastAttemptId = -1;
+ failedAttempts = 0;
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ public boolean isLeafTask() {
+ return this.isLeafTask;
+ }
+
+ public void setDataLocations(String [] dataLocations) {
+ this.dataLocations = dataLocations;
+ }
+
+ public String [] getDataLocations() {
+ return this.dataLocations;
+ }
+
+ public TaskState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setLogicalPlan(LogicalNode plan) {
+ Preconditions.checkArgument(plan.getType() == ExprType.STORE ||
+ plan.getType() == ExprType.CREATE_INDEX);
+
+ this.plan = plan;
+ store = (StoreTableNode) plan;
+
+ LogicalNode node = plan;
+ ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+ s.add(node);
+ while (!s.isEmpty()) {
+ node = s.remove(s.size()-1);
+ if (node instanceof UnaryNode) {
+ UnaryNode unary = (UnaryNode) node;
+ s.add(s.size(), unary.getSubNode());
+ } else if (node instanceof BinaryNode) {
+ BinaryNode binary = (BinaryNode) node;
+ s.add(s.size(), binary.getOuterNode());
+ s.add(s.size(), binary.getInnerNode());
+ } else if (node instanceof ScanNode) {
+ scan.add((ScanNode)node);
+ }
+ }
+ }
+
+ @Deprecated
+ public void setFragment(String tableId, Fragment fragment) {
+ this.fragMap.put(tableId, fragment);
+ if (fragment.hasDataLocations()) {
+ setDataLocations(fragment.getDataLocations());
+ }
+ }
+
+ public void setFragment2(Fragment fragment) {
+ this.fragMap.put(fragment.getId(), fragment);
+ if (fragment.hasDataLocations()) {
+ setDataLocations(fragment.getDataLocations());
+ }
+ }
+
+ public void addFetch(String tableId, String uri) throws URISyntaxException {
+ this.addFetch(tableId, new URI(uri));
+ }
+
+ public void addFetch(String tableId, URI uri) {
+ Set<URI> uris;
+ if (fetchMap.containsKey(tableId)) {
+ uris = fetchMap.get(tableId);
+ } else {
+ uris = Sets.newHashSet();
+ }
+ uris.add(uri);
+ fetchMap.put(tableId, uris);
+ }
+
+ public void addFetches(String tableId, Collection<URI> urilist) {
+ Set<URI> uris;
+ if (fetchMap.containsKey(tableId)) {
+ uris = fetchMap.get(tableId);
+ } else {
+ uris = Sets.newHashSet();
+ }
+ uris.addAll(urilist);
+ fetchMap.put(tableId, uris);
+ }
+
+ public void setFetches(Map<String, Set<URI>> fetches) {
+ this.fetchMap.clear();
+ this.fetchMap.putAll(fetches);
+ }
+
+ public Fragment getFragment(String tableId) {
+ return this.fragMap.get(tableId);
+ }
+
+ public Collection<Fragment> getAllFragments() {
+ return fragMap.values();
+ }
+
+ public LogicalNode getLogicalPlan() {
+ return this.plan;
+ }
+
+ public QueryUnitId getId() {
+ return taskId;
+ }
+
+ public Collection<URI> getFetchHosts(String tableId) {
+ return fetchMap.get(tableId);
+ }
+
+ public Collection<Set<URI>> getFetches() {
+ return fetchMap.values();
+ }
+
+ public Collection<URI> getFetch(ScanNode scan) {
+ return this.fetchMap.get(scan.getTableId());
+ }
+
+ public String getOutputName() {
+ return this.store.getTableName();
+ }
+
+ public Schema getOutputSchema() {
+ return this.store.getOutSchema();
+ }
+
+ public StoreTableNode getStoreTableNode() {
+ return this.store;
+ }
+
+ public ScanNode[] getScanNodes() {
+ return this.scan.toArray(new ScanNode[scan.size()]);
+ }
+
+ @Override
+ public String toString() {
+ String str = new String(plan.getType() + " \n");
+ for (Entry<String, Fragment> e : fragMap.entrySet()) {
+ str += e.getKey() + " : ";
+ str += e.getValue() + " ";
+ }
+ for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
+ str += e.getKey() + " : ";
+ for (URI t : e.getValue()) {
+ str += t + " ";
+ }
+ }
+
+ return str;
+ }
+
+ public void setStats(TableStat stats) {
+ this.stats = stats;
+ }
+
+ public void setPartitions(List<Partition> partitions) {
+ this.partitions = Collections.unmodifiableList(partitions);
+ }
+
+ public TableStat getStats() {
+ return this.stats;
+ }
+
+ public List<Partition> getPartitions() {
+ return this.partitions;
+ }
+
+ public int getPartitionNum() {
+ return this.partitions.size();
+ }
+
+ public QueryUnitAttempt newAttempt() {
+ QueryUnitAttempt attempt = new QueryUnitAttempt(
+ QueryIdFactory.newQueryUnitAttemptId(this.getId(),
+ ++lastAttemptId), this, eventHandler);
+ return attempt;
+ }
+
+ public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
+ return attempts.get(attemptId);
+ }
+
+ public QueryUnitAttempt getAttempt(int attempt) {
+ return this.attempts.get(new QueryUnitAttemptId(this.getId(), attempt));
+ }
+
+ public QueryUnitAttempt getLastAttempt() {
+ return this.attempts.get(this.lastAttemptId);
+ }
+
+ protected QueryUnitAttempt getSuccessfulAttempt() {
+ readLock.lock();
+ try {
+ if (null == successfulAttempt) {
+ return null;
+ }
+ return attempts.get(successfulAttempt);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public int getRetryCount () {
+ return this.lastAttemptId;
+ }
+
+ private static class InitialScheduleTransition implements
+ SingleArcTransition<QueryUnit, TaskEvent> {
+
+ @Override
+ public void transition(QueryUnit task, TaskEvent taskEvent) {
+ task.addAndScheduleAttempt();
+ }
+ }
+
+ // This is always called in the Write Lock
+ private void addAndScheduleAttempt() {
+ // Create new task attempt
+ QueryUnitAttempt attempt = newAttempt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created attempt " + attempt.getId());
+ }
+ switch (attempts.size()) {
+ case 0:
+ attempts = Collections.singletonMap(attempt.getId(), attempt);
+ break;
+
+ case 1:
+ Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
+ = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
+ newAttempts.putAll(attempts);
+ attempts = newAttempts;
+ attempts.put(attempt.getId(), attempt);
+ break;
+
+ default:
+ attempts.put(attempt.getId(), attempt);
+ break;
+ }
+
+ if (failedAttempts > 0) {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+ TaskAttemptEventType.TA_RESCHEDULE));
+ } else {
+ eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+ TaskAttemptEventType.TA_SCHEDULE));
+ }
+ }
+
+ private static class AttemptSucceededTransition
+ implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+ @Override
+ public void transition(QueryUnit task,
+ TaskEvent event) {
+ TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+ QueryUnitAttempt attempt = task.attempts.get(
+ attemptEvent.getTaskAttemptId());
+ task.successfulAttempt = attemptEvent.getTaskAttemptId();
+ task.succeededHost = attempt.getHost();
+ task.succeededPullServerPort = attempt.getPullServerPort();
+ task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
+ SubQueryEventType.SQ_TASK_COMPLETED));
+ }
+ }
+
+ private static class AttemptFailedTransition implements
+ MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
+
+ @Override
+ public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
+ TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
+ LOG.info("=============================================================");
+ LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+ LOG.info("=============================================================");
+ task.failedAttempts++;
+ task.finishedAttempts++;
+
+ if (task.failedAttempts < task.maxAttempts) {
+ if (task.successfulAttempt == null) {
+ task.addAndScheduleAttempt();
+ }
+ } else {
+ task.eventHandler.handle(
+ new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
+ return TaskState.FAILED;
+ }
+
+ return task.getState();
+ }
+ }
+
+ @Override
+ public void handle(TaskEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getTaskId() + " of type "
+ + event.getType());
+ }
+
+ try {
+ writeLock.lock();
+ TaskState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ eventHandler.handle(new QueryEvent(getId().getQueryId(),
+ QueryEventType.INTERNAL_ERROR));
+ }
+
+ //notify the eventhandler of state change
+ if (LOG.isDebugEnabled()) {
+ if (oldState != getState()) {
+ LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+ + getState());
+ }
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void setIntermediateData(Collection<IntermediateEntry> partitions) {
+ this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
+ }
+
+ public List<IntermediateEntry> getIntermediateData() {
+ return this.intermediateData;
+ }
+
+ public static class IntermediateEntry {
+ int taskId;
+ int attemptId;
+ int partitionId;
+ String pullHost;
+ int port;
+
+ public IntermediateEntry(int taskId, int attemptId, int partitionId,
+ String pullServerAddr, int pullServerPort) {
+ this.taskId = taskId;
+ this.attemptId = attemptId;
+ this.partitionId = partitionId;
+ this.pullHost = pullServerAddr;
+ this.port = pullServerPort;
+ }
+
+ public int getTaskId() {
+ return this.taskId;
+ }
+
+ public int getAttemptId() {
+ return this.attemptId;
+ }
+
+ public int getPartitionId() {
+ return this.partitionId;
+ }
+
+ public String getPullHost() {
+ return this.pullHost;
+ }
+
+ public int getPullPort() {
+ return port;
+ }
+
+ public String getPullAddress() {
+ return pullHost + ":" + port;
+ }
+ }
+}