You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:05 UTC
[7/8] TAJO-91: Launch QueryMaster on NodeManager per query.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
new file mode 100644
index 0000000..b935eb7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.pullserver.PullServerAuxService;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+public abstract class ContainerProxy {
+ private static final Log LOG = LogFactory.getLog(ContainerProxy.class);
+
+ final public static FsPermission QUERYCONF_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ private final static RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private static enum ContainerState {
+ PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
+ }
+
+ private final YarnRPC yarnRPC;
+ private Configuration conf;
+ private QueryMaster.QueryContext context;
+
+ private ContainerState state;
+ // store enough information to be able to cleanup the container
+ private Container container;
+ private ContainerId containerID;
+ final private String containerMgrAddress;
+ private ContainerToken containerToken;
+ private String hostName;
+ private int port = -1;
+
+ protected abstract void containerStarted();
+ protected abstract String getId();
+ protected abstract String getRunnerClass();
+ protected abstract Vector<CharSequence> getTaskParams();
+
+ public ContainerProxy(QueryMaster.QueryContext context, Configuration conf, YarnRPC yarnRPC, Container container) {
+ this.context = context;
+ this.conf = conf;
+ this.yarnRPC = yarnRPC;
+ this.state = ContainerState.PREP;
+ this.container = container;
+ this.containerID = container.getId();
+ NodeId nodeId = container.getNodeId();
+ this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
+ this.containerToken = container.getContainerToken();
+ }
+
+ protected ContainerManager getCMProxy(ContainerId containerID,
+ final String containerManagerBindAddr,
+ ContainerToken containerToken)
+ throws IOException {
+ String [] hosts = containerManagerBindAddr.split(":");
+ final InetSocketAddress cmAddr =
+ new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Token<ContainerTokenIdentifier> token =
+ ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
+ // the user in createRemoteUser in this context has to be ContainerID
+ user = UserGroupInformation.createRemoteUser(containerID.toString());
+ user.addToken(token);
+ }
+
+ ContainerManager proxy = user.doAs(new PrivilegedAction<ContainerManager>() {
+ @Override
+ public ContainerManager run() {
+ return (ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddr, conf);
+ }
+ });
+
+ return proxy;
+ }
+
+ public synchronized boolean isCompletelyDone() {
+ return state == ContainerState.DONE || state == ContainerState.FAILED;
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
+ LOG.info("Launching Container with Id: " + containerID);
+ if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+ state = ContainerState.DONE;
+ LOG.error("Container (" + containerID + " was killed before it was launched");
+ return;
+ }
+
+ ContainerManager proxy = null;
+ try {
+
+ proxy = getCMProxy(containerID, containerMgrAddress,
+ containerToken);
+
+ // Construct the actual Container
+ ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
+
+ // Now launch the actual container
+ StartContainerRequest startRequest = Records
+ .newRecord(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ StartContainerResponse response = proxy.startContainer(startRequest);
+
+ ByteBuffer portInfo = response
+ .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
+
+ if(portInfo != null) {
+ port = PullServerAuxService.deserializeMetaData(portInfo);
+ }
+
+ LOG.info("PullServer port returned by ContainerManager for "
+ + containerID + " : " + port);
+
+ if(port < 0) {
+ this.state = ContainerState.FAILED;
+ throw new IllegalStateException("Invalid shuffle port number "
+ + port + " returned for " + containerID);
+ }
+
+ containerStarted();
+
+ this.state = ContainerState.RUNNING;
+ this.hostName = containerMgrAddress.split(":")[0];
+ context.addContainer(containerID, this);
+ } catch (Throwable t) {
+ String message = "Container launch failed for " + containerID + " : "
+ + StringUtils.stringifyException(t);
+ this.state = ContainerState.FAILED;
+ LOG.error(message);
+ } finally {
+ if (proxy != null) {
+ yarnRPC.stopProxy(proxy, conf);
+ }
+ }
+ }
+
+ public synchronized void kill() {
+
+ if(isCompletelyDone()) {
+ return;
+ }
+ if(this.state == ContainerState.PREP) {
+ this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+ } else {
+ LOG.info("KILLING " + containerID);
+
+ ContainerManager proxy = null;
+ try {
+ proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+ this.containerToken);
+
+ // kill the remote container if already launched
+ StopContainerRequest stopRequest = Records
+ .newRecord(StopContainerRequest.class);
+ stopRequest.setContainerId(this.containerID);
+ proxy.stopContainer(stopRequest);
+ // If stopContainer returns without an error, assuming the stop made
+ // it over to the NodeManager.
+// context.getEventHandler().handle(
+// new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+ context.removeContainer(containerID);
+ } catch (Throwable t) {
+
+ // ignore the cleanup failure
+ String message = "cleanup failed for container "
+ + this.containerID + " : "
+ + StringUtils.stringifyException(t);
+ LOG.warn(message);
+ this.state = ContainerState.DONE;
+ return;
+ } finally {
+ if (proxy != null) {
+ yarnRPC.stopProxy(proxy, conf);
+ }
+ }
+ this.state = ContainerState.DONE;
+ }
+ }
+
+ public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config) {
+ TajoConf conf = (TajoConf)config;
+
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ try {
+ ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the env variables to be setup
+ ////////////////////////////////////////////////////////////////////////////
+ LOG.info("Set the environment for the application master");
+
+ Map<String, String> environment = new HashMap<String, String>();
+ //String initialClassPath = getInitialClasspath(conf);
+ environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
+ if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
+ environment.put(ApplicationConstants.Environment.JAVA_HOME.name(),
+ System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
+ }
+
+ // TODO - to be improved with org.apache.tajo.sh shell script
+ Properties prop = System.getProperties();
+
+ if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
+ (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
+ LOG.info("tajo.test is TRUE");
+ environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty("java.class.path", null));
+ environment.put("tajo.test", "TRUE");
+ } else {
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder("./");
+ //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+ for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+
+ classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
+ classPathEnv.append(":./log4j.properties:./*");
+ if(System.getenv("HADOOP_HOME") != null) {
+ environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
+ environment.put(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(), System.getenv("HADOOP_HOME"));
+ environment.put(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(), System.getenv("HADOOP_HOME"));
+ environment.put(ApplicationConstants.Environment.HADOOP_YARN_HOME.name(), System.getenv("HADOOP_HOME"));
+ }
+
+ if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
+ environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
+ }
+ environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
+ }
+
+ ctx.setEnvironment(environment);
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ FileSystem fs = null;
+
+ LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
+
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ FileContext fsCtx = null;
+ try {
+ fsCtx = FileContext.getFileContext(conf);
+ } catch (UnsupportedFileSystemException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ LOG.info("Writing a QueryConf to HDFS and add to local environment");
+
+ Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
+ try {
+ writeConf(conf, queryConfPath);
+
+ LocalResource queryConfSrc = createApplicationResource(fsCtx, queryConfPath, LocalResourceType.FILE);
+ localResources.put(QueryConf.FILENAME, queryConfSrc);
+
+ ctx.setLocalResources(localResources);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ // TODO - move to sub-class
+ // Add shuffle token
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+ try {
+ serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID, PullServerAuxService.serializeMetaData(0));
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ }
+ ctx.setServiceData(serviceData);
+
+ return ctx;
+ }
+
+ private static LocalResource createApplicationResource(FileContext fs,
+ Path p, LocalResourceType type)
+ throws IOException {
+ LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ private static void writeConf(Configuration conf, Path queryConfFile)
+ throws IOException {
+ // Write job file to Tajo's fs
+ FileSystem fs = queryConfFile.getFileSystem(conf);
+ FSDataOutputStream out =
+ FileSystem.create(fs, queryConfFile,
+ new FsPermission(QUERYCONF_FILE_PERMISSION));
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+
+ // Duplicate the ByteBuffers for access by multiple containers.
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ //LOG.info("Setting up app master command");
+ vargs.add("${JAVA_HOME}" + "/bin/java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx2000m");
+ // Set Remote Debugging
+ //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
+ //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ //}
+ // Set class name
+ vargs.add(getRunnerClass());
+ vargs.add(getId()); // subqueryId
+ vargs.add(containerMgrAddress); // nodeId
+ vargs.add(containerID.toString()); // containerId
+ Vector<CharSequence> taskParams = getTaskParams();
+ if(taskParams != null) {
+ vargs.addAll(taskParams);
+ }
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up TaskRunner command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ return BuilderUtils.newContainerLaunchContext(containerID, commonContainerLaunchContext.getUser(),
+ container.getResource(), commonContainerLaunchContext.getLocalResources(), myEnv, commands,
+ myServiceData, null, new HashMap<ApplicationAccessType, String>());
+ }
+
+ public String getTaskHostName() {
+ return this.hostName;
+ }
+
+ public int getTaskPort() {
+ return this.port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2f4f12f..3d9a364 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -51,6 +51,7 @@ import org.apache.tajo.engine.planner.global.GlobalOptimizer;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.querymaster.QueryMasterManager;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.TajoIdUtils;
@@ -93,8 +94,7 @@ public class GlobalEngine extends AbstractService {
planner = new LogicalPlanner(context.getCatalog());
optimizer = new LogicalOptimizer();
- globalPlanner = new GlobalPlanner(context.getConf(), context.getCatalog(),
- sm, context.getEventHandler());
+ globalPlanner = new GlobalPlanner(context.getConf(), sm, context.getEventHandler());
globalOptimizer = new GlobalOptimizer();
} catch (Throwable t) {
@@ -105,7 +105,9 @@ public class GlobalEngine extends AbstractService {
public void stop() {
super.stop();
- yarnClient.stop();
+ if (yarnClient != null) {
+ yarnClient.stop();
+ }
}
public QueryId executeQuery(String tql)
@@ -123,21 +125,26 @@ public class GlobalEngine extends AbstractService {
updateQuery(plan.getSubNode());
return TajoIdUtils.NullQueryId;
} else {
- ApplicationAttemptId appAttemptId = submitQuery();
- QueryId queryId = TajoIdUtils.createQueryId(appAttemptId);
- MasterPlan masterPlan = createGlobalPlan(queryId, plan);
+ GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+ QueryId queryId = TajoIdUtils.createQueryId(appId, 0);
+
+ LOG.info("Get AppId: " + appId + ", QueryId: " + queryId);
+ LOG.info("Setting up application submission context for ASM");
+
+ //request QueryMaster container
QueryConf queryConf = new QueryConf(context.getConf());
queryConf.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-
// the output table is given by user
if (plan.getSubNode().getType() == ExprType.CREATE_TABLE) {
CreateTableNode createTableNode = (CreateTableNode) plan.getSubNode();
queryConf.setOutputTable(createTableNode.getTableName());
}
-
- QueryMaster query = new QueryMaster(context, appAttemptId,
- context.getClock(), querySubmittionTime, masterPlan);
- startQuery(queryId, queryConf, query);
+ QueryMasterManager queryMasterManager = new QueryMasterManager(context, yarnClient, queryId, tql, plan, appId,
+ context.getClock(), querySubmittionTime);
+ queryMasterManager.init(queryConf);
+ queryMasterManager.start();
+ context.addQuery(queryId, queryMasterManager);
return queryId;
}
@@ -145,9 +152,46 @@ public class GlobalEngine extends AbstractService {
private ApplicationAttemptId submitQuery() throws YarnRemoteException {
GetNewApplicationResponse newApp = getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+ LOG.info("Get AppId: " + appId);
+ LOG.info("Setting up application submission context for ASM");
+
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Tajo");
+
+ org.apache.hadoop.yarn.api.records.Priority
+ pri = Records.newRecord(org.apache.hadoop.yarn.api.records.Priority.class);
+ pri.setPriority(5);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+
+ LOG.info("Submitting application to ASM");
+ yarnClient.submitApplication(appContext);
+
+ ApplicationReport appReport = monitorApplication(appId,
+ EnumSet.of(YarnApplicationState.ACCEPTED));
+ ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+ LOG.info("Launching application with id: " + attemptId);
+
+ return attemptId;
+ }
+
+ private ApplicationAttemptId submitQueryOld() throws YarnRemoteException {
+ GetNewApplicationResponse newApp = getNewApplication();
// Get a new application id
ApplicationId appId = newApp.getApplicationId();
- System.out.println("Get AppId: " + appId);
+ LOG.info("Get AppId: " + appId);
LOG.info("Setting up application submission context for ASM");
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
@@ -209,7 +253,6 @@ public class GlobalEngine extends AbstractService {
CreateTableNode createTable = (CreateTableNode) root;
createTable(createTable);
return true;
-
case DROP_TABLE:
DropTableNode stmt = (DropTableNode) root;
dropTable(stmt.getTableName());
@@ -227,7 +270,7 @@ public class GlobalEngine extends AbstractService {
try {
optimizedPlan = optimizer.optimize(plan);
} catch (PlanningException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
LOG.info("LogicalPlan:\n" + plan.getRootBlock().getRoot());
@@ -240,12 +283,12 @@ public class GlobalEngine extends AbstractService {
return globalOptimizer.optimize(globalPlan);
}
- private void startQuery(final QueryId queryId, final QueryConf queryConf,
- final QueryMaster query) {
- context.getAllQueries().put(queryId, query);
- query.init(queryConf);
- query.start();
- }
+// private void startQuery(final QueryId queryId, final QueryConf queryConf,
+// final QueryMaster query) {
+// context.getAllQueries().put(queryId, query);
+// query.init(queryConf);
+// query.start();
+// }
private TableDesc createTable(CreateTableNode createTable) throws IOException {
TableMeta meta;
@@ -328,7 +371,7 @@ public class GlobalEngine extends AbstractService {
private void connectYarnClient() {
this.yarnClient = new YarnClientImpl();
- this.yarnClient.init(getConfig());
+ this.yarnClient.init(context.getConf());
this.yarnClient.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 9907377..9522086 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -47,7 +47,7 @@ public class GlobalPlanner {
private StorageManager sm;
private QueryId queryId;
- public GlobalPlanner(final TajoConf conf, final CatalogService catalog,
+ public GlobalPlanner(final TajoConf conf,
final StorageManager sm,
final EventHandler eventHandler)
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java
deleted file mode 100644
index 6e7092c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.QueryConf;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
-import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableDescImpl;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.master.QueryMaster.QueryContext;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.storage.StorageManager;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class Query implements EventHandler<QueryEvent> {
- private static final Log LOG = LogFactory.getLog(Query.class);
-
-
- // Facilities for Query
- private final QueryConf conf;
- private final Clock clock;
- private String queryStr;
- private Map<SubQueryId, SubQuery> subqueries;
- private final EventHandler eventHandler;
- private final MasterPlan plan;
- private final StorageManager sm;
- private QueryContext context;
- private ExecutionBlockCursor cursor;
-
- // Query Status
- private final QueryId id;
- private long appSubmitTime;
- private long startTime;
- private long initializationTime;
- private long finishTime;
- private TableDesc resultDesc;
- private int completedSubQueryCount = 0;
- private final List<String> diagnostics = new ArrayList<String>();
-
- // Internal Variables
- private final Lock readLock;
- private final Lock writeLock;
- private int priority = 100;
-
- // State Machine
- private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
-
- private static final StateMachineFactory
- <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
- new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
- (QueryState.QUERY_NEW)
-
- .addTransition(QueryState.QUERY_NEW,
- EnumSet.of(QueryState.QUERY_INIT, QueryState.QUERY_FAILED),
- QueryEventType.INIT, new InitTransition())
-
- .addTransition(QueryState.QUERY_INIT, QueryState.QUERY_RUNNING,
- QueryEventType.START, new StartTransition())
-
- .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
- QueryEventType.INIT_COMPLETED, new InitCompleteTransition())
- .addTransition(QueryState.QUERY_RUNNING,
- EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED,
- QueryState.QUERY_FAILED),
- QueryEventType.SUBQUERY_COMPLETED,
- new SubQueryCompletedTransition())
- .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR, new InternalErrorTransition())
- .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR)
-
- .installTopology();
-
- public Query(final QueryContext context, final QueryId id, Clock clock,
- final long appSubmitTime,
- final String queryStr,
- final EventHandler eventHandler,
- final MasterPlan plan, final StorageManager sm) {
- this.context = context;
- this.conf = context.getConf();
- this.id = id;
- this.clock = clock;
- this.appSubmitTime = appSubmitTime;
- this.queryStr = queryStr;
- subqueries = Maps.newHashMap();
- this.eventHandler = eventHandler;
- this.plan = plan;
- this.sm = sm;
- cursor = new ExecutionBlockCursor(plan);
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
-
- stateMachine = stateMachineFactory.make(this);
- }
-
- public boolean isCreateTableStmt() {
- return context.isCreateTableQuery();
- }
-
- protected FileSystem getFileSystem(Configuration conf) throws IOException {
- return FileSystem.get(conf);
- }
-
- public float getProgress() {
- QueryState state = getStateMachine().getCurrentState();
- if (state == QueryState.QUERY_SUCCEEDED) {
- return 1.0f;
- } else {
- int idx = 0;
- float [] subProgresses = new float[subqueries.size()];
- boolean finished = true;
- for (SubQuery subquery: subqueries.values()) {
- if (subquery.getState() != SubQueryState.NEW) {
- subProgresses[idx] = subquery.getProgress();
- if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
- finished = false;
- }
- } else {
- subProgresses[idx] = 0.0f;
- }
- idx++;
- }
-
- if (finished) {
- return 1.0f;
- }
-
- float totalProgress = 0;
- float proportion = 1.0f / (float)subqueries.size();
-
- for (int i = 0; i < subProgresses.length; i++) {
- totalProgress += subProgresses[i] * proportion;
- }
-
- return totalProgress;
- }
- }
-
- public long getAppSubmitTime() {
- return this.appSubmitTime;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime() {
- startTime = clock.getTime();
- }
-
- public long getInitializationTime() {
- return initializationTime;
- }
-
- public void setInitializationTime() {
- initializationTime = clock.getTime();
- }
-
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public void setFinishTime() {
- finishTime = clock.getTime();
- }
-
- public List<String> getDiagnostics() {
- readLock.lock();
- try {
- return diagnostics;
- } finally {
- readLock.unlock();
- }
- }
-
- protected void addDiagnostic(String diag) {
- diagnostics.add(diag);
- }
-
- public TableDesc getResultDesc() {
- return resultDesc;
- }
-
- public void setResultDesc(TableDesc desc) {
- resultDesc = desc;
- }
-
- public MasterPlan getPlan() {
- return plan;
- }
-
- public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
- return stateMachine;
- }
-
- public void addSubQuery(SubQuery subquery) {
- subqueries.put(subquery.getId(), subquery);
- }
-
- public QueryId getId() {
- return this.id;
- }
-
- public SubQuery getSubQuery(SubQueryId id) {
- return this.subqueries.get(id);
- }
-
- public QueryState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- public ExecutionBlockCursor getExecutionBlockCursor() {
- return cursor;
- }
-
- static class InitTransition
- implements MultipleArcTransition<Query, QueryEvent, QueryState> {
-
- @Override
- public QueryState transition(Query query, QueryEvent queryEvent) {
- query.setStartTime();
- return QueryState.QUERY_INIT;
- }
- }
-
- public static class StartTransition
- implements SingleArcTransition<Query, QueryEvent> {
-
- @Override
- public void transition(Query query, QueryEvent queryEvent) {
- SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
- query.sm);
- subQuery.setPriority(query.priority--);
- query.addSubQuery(subQuery);
- LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
- subQuery.handle(new SubQueryEvent(subQuery.getId(),
- SubQueryEventType.SQ_INIT));
- }
- }
-
- public static class SubQueryCompletedTransition implements
- MultipleArcTransition<Query, QueryEvent, QueryState> {
-
- @Override
- public QueryState transition(Query query, QueryEvent event) {
- // increase the count for completed subqueries
- query.completedSubQueryCount++;
- SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
- ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-
- // if the subquery is succeeded
- if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
- if (cursor.hasNext()) {
- SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
- nextSubQuery.setPriority(query.priority--);
- query.addSubQuery(nextSubQuery);
- nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
- SubQueryEventType.SQ_INIT));
- LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
- LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
- return query.checkQueryForCompleted();
-
- } else { // Finish a query
- if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
- SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
- TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
- subQuery.getTableMeta(), query.context.getOutputPath());
- query.setResultDesc(desc);
- try {
- query.writeStat(query.context.getOutputPath(), subQuery);
- } catch (IOException e) {
- e.printStackTrace();
- }
- query.eventHandler.handle(new QueryFinishEvent(query.getId()));
-
- if (query.context.isCreateTableQuery()) {
- query.context.getCatalog().addTable(desc);
- }
- }
-
- return query.finished(QueryState.QUERY_SUCCEEDED);
- }
- } else {
- // if at least one subquery is failed, the query is also failed.
- return QueryState.QUERY_FAILED;
- }
- }
- }
-
- private static class DiagnosticsUpdateTransition implements
- SingleArcTransition<Query, QueryEvent> {
- @Override
- public void transition(Query query, QueryEvent event) {
- query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event)
- .getDiagnosticUpdate());
- }
- }
-
- private static class InitCompleteTransition implements
- SingleArcTransition<Query, QueryEvent> {
- @Override
- public void transition(Query query, QueryEvent event) {
- if (query.initializationTime == 0) {
- query.setInitializationTime();
- }
- }
- }
-
- private static class InternalErrorTransition
- implements SingleArcTransition<Query, QueryEvent> {
-
- @Override
- public void transition(Query query, QueryEvent event) {
- query.finished(QueryState.QUERY_ERROR);
- }
- }
-
- public QueryState finished(QueryState finalState) {
- setFinishTime();
- return finalState;
- }
-
- /**
- * Check if all subqueries of the query are completed
- * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
- */
- QueryState checkQueryForCompleted() {
- if (completedSubQueryCount == subqueries.size()) {
- return QueryState.QUERY_SUCCEEDED;
- }
- return getState();
- }
-
-
- @Override
- public void handle(QueryEvent event) {
- LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
- try {
- writeLock.lock();
- QueryState oldState = getState();
- try {
- getStateMachine().doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
- eventHandler.handle(new QueryEvent(this.id,
- QueryEventType.INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (oldState != getState()) {
- LOG.info(id + " Query Transitioned from " + oldState + " to "
- + getState());
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-
- private void writeStat(Path outputPath, SubQuery subQuery)
- throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
- sm.writeTableMeta(outputPath, subQuery.getTableMeta());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java
deleted file mode 100644
index f4dc455..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.hadoop.yarn.service.Service;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.rm.RMContainerAllocator;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class QueryMaster extends CompositeService implements EventHandler {
- private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
-
- // Master Context
- private final MasterContext masterContext;
-
- // AppMaster Common
- private final Clock clock;
- private final long appSubmitTime;
- private String appName;
- private final ApplicationAttemptId appAttemptID;
-
- // For Query
- private final QueryId queryId;
- private QueryContext queryContext;
- private Query query;
- private MasterPlan masterPlan;
-
- private AsyncDispatcher dispatcher;
- private YarnRPC rpc;
- private RMContainerAllocator rmAllocator;
- private TaskRunnerListener taskRunnerListener;
- private TaskRunnerLauncher taskRunnerLauncher;
-
- // Services of Tajo
- private CatalogService catalog;
-
- private boolean isCreateTableStmt;
- private StorageManager storageManager;
- private FileSystem defaultFS;
- private Path outputPath;
-
- public QueryMaster(final MasterContext masterContext,
- final ApplicationAttemptId appAttemptID,
- final Clock clock, long appSubmitTime,
- MasterPlan masterPlan) {
- super(QueryMaster.class.getName());
- this.masterContext = masterContext;
-
- this.appAttemptID = appAttemptID;
- this.clock = clock;
- this.appSubmitTime = appSubmitTime;
-
- this.queryId = TajoIdUtils.createQueryId(appAttemptID);
- this.masterPlan = masterPlan;
- LOG.info("Created Query Master for " + appAttemptID);
- }
-
- public void init(Configuration _conf) {
- QueryConf conf = new QueryConf(_conf);
-
- try {
- queryContext = new QueryContext(conf);
-
- dispatcher = masterContext.getDispatcher();
- // TODO - This comment should be eliminated when QueryMaster is separated.
- dispatcher = new AsyncDispatcher();
- addIfService(dispatcher);
-
- // TODO - This comment should be improved when QueryMaster is separated.
- rpc = masterContext.getYarnRPC();
-
- catalog = masterContext.getCatalog();
- storageManager = masterContext.getStorageManager();
-
- taskRunnerListener = new TaskRunnerListener(queryContext);
- addIfService(taskRunnerListener);
-
- rmAllocator = new RMContainerAllocator(queryContext);
- addIfService(rmAllocator);
- dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
-
- query = new Query(queryContext, queryId, clock, appSubmitTime,
- "", dispatcher.getEventHandler(), masterPlan, storageManager);
- initStagingDir();
-
- // QueryEventDispatcher is already registered in TajoMaster
- dispatcher.register(QueryEventType.class, query);
- dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
- dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
- dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
- dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
- dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
-
- taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
- addIfService(taskRunnerLauncher);
- dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
-
-
- } catch (Throwable t) {
- LOG.error(ExceptionUtils.getStackTrace(t));
- throw new RuntimeException(t);
- }
-
- super.init(conf);
- }
-
- public void start() {
- super.start();
- startQuery();
- }
-
- public void stop() {
- super.stop();
- }
-
- protected void addIfService(Object object) {
- if (object instanceof Service) {
- addService((Service) object);
- }
- }
-
- public void startQuery() {
- dispatcher.getEventHandler().handle(new QueryEvent(queryId,
- QueryEventType.INIT));
- dispatcher.getEventHandler().handle(new QueryEvent(queryId,
- QueryEventType.START));
- }
-
- @Override
- public void handle(Event event) {
- dispatcher.getEventHandler().handle(event);
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
- public void handle(SubQueryEvent event) {
- SubQueryId id = event.getSubQueryId();
- query.getSubQuery(id).handle(event);
- }
- }
-
- private class TaskEventDispatcher
- implements EventHandler<TaskEvent> {
- public void handle(TaskEvent event) {
- QueryUnitId taskId = event.getTaskId();
- QueryUnit task = query.getSubQuery(taskId.getSubQueryId()).
- getQueryUnit(taskId);
- task.handle(event);
- }
- }
-
- private class TaskAttemptEventDispatcher
- implements EventHandler<TaskAttemptEvent> {
- public void handle(TaskAttemptEvent event) {
- QueryUnitAttemptId attemptId = event.getTaskAttemptId();
- SubQuery subQuery = query.getSubQuery(attemptId.getSubQueryId());
- QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
- QueryUnitAttempt attempt = task.getAttempt(attemptId);
- attempt.handle(event);
- }
- }
-
- private class TaskSchedulerDispatcher
- implements EventHandler<TaskSchedulerEvent> {
- public void handle(TaskSchedulerEvent event) {
- SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
- subQuery.getTaskScheduler().handle(event);
- }
- }
-
- public QueryContext getContext() {
- return this.queryContext;
- }
-
- public class QueryContext {
- private QueryConf conf;
- public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
- int minCapability;
- int maxCapability;
- int numCluster;
-
- public QueryContext(QueryConf conf) {
- this.conf = conf;
- }
-
- public QueryConf getConf() {
- return conf;
- }
-
- public AsyncDispatcher getDispatcher() {
- return dispatcher;
- }
-
- public Clock getClock() {
- return clock;
- }
-
- public Query getQuery() {
- return query;
- }
-
- public SubQuery getSubQuery(SubQueryId subQueryId) {
- return query.getSubQuery(subQueryId);
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public ApplicationId getApplicationId() {
- return appAttemptID.getApplicationId();
- }
-
- public ApplicationAttemptId getApplicationAttemptId() {
- return appAttemptID;
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- public YarnRPC getYarnRPC() {
- return rpc;
- }
-
- public InetSocketAddress getRpcAddress() {
- return masterContext.getClientService().getBindAddress();
- }
-
- public InetSocketAddress getTaskListener() {
- return taskRunnerListener.getBindAddress();
- }
-
- public void addContainer(ContainerId cId, ContainerProxy container) {
- containers.put(cId, container);
- }
-
- public void removeContainer(ContainerId cId) {
- containers.remove(cId);
- }
-
- public boolean containsContainer(ContainerId cId) {
- return containers.containsKey(cId);
- }
-
- public ContainerProxy getContainer(ContainerId cId) {
- return containers.get(cId);
- }
-
- public int getNumClusterNode() {
- return numCluster;
- }
-
- public void setNumClusterNodes(int num) {
- numCluster = num;
- }
-
- public CatalogService getCatalog() {
- return catalog;
- }
-
- public Path getOutputPath() {
- return outputPath;
- }
-
- public void setMaxContainerCapability(int capability) {
- this.maxCapability = capability;
- }
-
- public int getMaxContainerCapability() {
- return this.maxCapability;
- }
-
- public void setMinContainerCapability(int capability) {
- this.minCapability = capability;
- }
-
- public int getMinContainerCapability() {
- return this.minCapability;
- }
-
- public boolean isCreateTableQuery() {
- return isCreateTableStmt;
- }
-
- public float getProgress() {
- return query.getProgress();
- }
-
- public long getStartTime() {
- return query.getStartTime();
- }
-
- public long getFinishTime() {
- return query.getFinishTime();
- }
-
- public StorageManager getStorageManager() {
- return storageManager;
- }
- }
-
- private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
- @Override
- public void handle(QueryFinishEvent event) {
- LOG.info("Query end notification started for QueryId : " + query.getId());
-
- try {
- // Stop all services
- // This will also send the final report to the ResourceManager
- LOG.info("Calling stop for all the services");
- stop();
-
- } catch (Throwable t) {
- LOG.warn("Graceful stop failed ", t);
- }
-
- //Bring the process down by force.
- //Not needed after HADOOP-7140
- LOG.info("Exiting QueryMaster..GoodBye!");
- // TODO - to be enabled if query master is separated.
- //System.exit(0);
- }
- }
-
- // query submission directory is private!
- final public static FsPermission USER_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0700); // rwx--------
-
- /**
- * It initializes the final output and staging directory and sets
- * them to variables.
- */
- private void initStagingDir() throws IOException {
- QueryConf conf = getContext().getConf();
-
- String realUser;
- String currentUser;
- UserGroupInformation ugi;
- ugi = UserGroupInformation.getLoginUser();
- realUser = ugi.getShortUserName();
- currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
- String givenOutputTableName = conf.getOutputTable();
- Path stagingDir;
-
- // If final output directory is not given by an user,
- // we use the query id as a output directory.
- if (givenOutputTableName.equals("")) {
- this.isCreateTableStmt = false;
- FileSystem defaultFS = FileSystem.get(conf);
-
- Path homeDirectory = defaultFS.getHomeDirectory();
- if (!defaultFS.exists(homeDirectory)) {
- defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
- }
-
- Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
-
- if (defaultFS.exists(userQueryDir)) {
- FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
- String owner = fsStatus.getOwner();
-
- if (!(owner.equals(currentUser) || owner.equals(realUser))) {
- throw new IOException("The ownership on the user's query " +
- "directory " + userQueryDir + " is not as expected. " +
- "It is owned by " + owner + ". The directory must " +
- "be owned by the submitter " + currentUser + " or " +
- "by " + realUser);
- }
-
- if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
- LOG.info("Permissions on staging directory " + userQueryDir + " are " +
- "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
- "to correct value " + USER_DIR_PERMISSION);
- defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
- }
- } else {
- defaultFS.mkdirs(userQueryDir,
- new FsPermission(USER_DIR_PERMISSION));
- }
-
- stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
-
- if (defaultFS.exists(stagingDir)) {
- throw new IOException("The staging directory " + stagingDir
- + "already exists. The directory must be unique to each query");
- } else {
- defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
- }
-
- // Set the query id to the output table name
- conf.setOutputTable(queryId.toString());
-
- } else {
- this.isCreateTableStmt = true;
- Path warehouseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR),
- TajoConstants.WAREHOUSE_DIR);
- stagingDir = new Path(warehouseDir, conf.getOutputTable());
-
- FileSystem fs = warehouseDir.getFileSystem(conf);
- if (fs.exists(stagingDir)) {
- throw new IOException("The staging directory " + stagingDir
- + " already exists. The directory must be unique to each query");
- } else {
- // TODO - should have appropriate permission
- fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
- }
- }
-
- conf.setOutputPath(stagingDir);
- outputPath = stagingDir;
- LOG.info("Initialized Query Staging Dir: " + outputPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java
deleted file mode 100644
index 8eb26bc..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.engine.MasterWorkerProtos.Partition;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.storage.Fragment;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class QueryUnit implements EventHandler<TaskEvent> {
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(QueryUnit.class);
-
- private QueryUnitId taskId;
- private EventHandler eventHandler;
- private StoreTableNode store = null;
- private LogicalNode plan = null;
- private List<ScanNode> scan;
-
- private Map<String, Fragment> fragMap;
- private Map<String, Set<URI>> fetchMap;
-
- private List<Partition> partitions;
- private TableStat stats;
- private String [] dataLocations;
- private final boolean isLeafTask;
- private List<IntermediateEntry> intermediateData;
-
- private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
- private final int maxAttempts = 3;
- private Integer lastAttemptId;
-
- private QueryUnitAttemptId successfulAttempt;
- private String succeededHost;
- private int succeededPullServerPort;
-
- private int failedAttempts;
- private int finishedAttempts; // finish are total of success, failed and killed
-
- private static final StateMachineFactory
- <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
- new StateMachineFactory
- <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
- .addTransition(TaskState.NEW, TaskState.SCHEDULED,
- TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-
- .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
-
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
-
- .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
- TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
-
- .addTransition(TaskState.RUNNING,
- EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
- TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
-
-
-
- .installTopology();
- private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
-
-
- private final Lock readLock;
- private final Lock writeLock;
-
- public QueryUnit(QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
- this.taskId = id;
- this.eventHandler = eventHandler;
- this.isLeafTask = isLeafTask;
- scan = new ArrayList<ScanNode>();
- fetchMap = Maps.newHashMap();
- fragMap = Maps.newHashMap();
- partitions = new ArrayList<Partition>();
- attempts = Collections.emptyMap();
- lastAttemptId = -1;
- failedAttempts = 0;
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
-
- stateMachine = stateMachineFactory.make(this);
- }
-
- public boolean isLeafTask() {
- return this.isLeafTask;
- }
-
- public void setDataLocations(String [] dataLocations) {
- this.dataLocations = dataLocations;
- }
-
- public String [] getDataLocations() {
- return this.dataLocations;
- }
-
- public TaskState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- public void setLogicalPlan(LogicalNode plan) {
- Preconditions.checkArgument(plan.getType() == ExprType.STORE ||
- plan.getType() == ExprType.CREATE_INDEX);
-
- this.plan = plan;
- store = (StoreTableNode) plan;
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getSubNode());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- s.add(s.size(), binary.getOuterNode());
- s.add(s.size(), binary.getInnerNode());
- } else if (node instanceof ScanNode) {
- scan.add((ScanNode)node);
- }
- }
- }
-
- @Deprecated
- public void setFragment(String tableId, Fragment fragment) {
- this.fragMap.put(tableId, fragment);
- if (fragment.hasDataLocations()) {
- setDataLocations(fragment.getDataLocations());
- }
- }
-
- public void setFragment2(Fragment fragment) {
- this.fragMap.put(fragment.getId(), fragment);
- if (fragment.hasDataLocations()) {
- setDataLocations(fragment.getDataLocations());
- }
- }
-
- public void addFetch(String tableId, String uri) throws URISyntaxException {
- this.addFetch(tableId, new URI(uri));
- }
-
- public void addFetch(String tableId, URI uri) {
- Set<URI> uris;
- if (fetchMap.containsKey(tableId)) {
- uris = fetchMap.get(tableId);
- } else {
- uris = Sets.newHashSet();
- }
- uris.add(uri);
- fetchMap.put(tableId, uris);
- }
-
- public void addFetches(String tableId, Collection<URI> urilist) {
- Set<URI> uris;
- if (fetchMap.containsKey(tableId)) {
- uris = fetchMap.get(tableId);
- } else {
- uris = Sets.newHashSet();
- }
- uris.addAll(urilist);
- fetchMap.put(tableId, uris);
- }
-
- public void setFetches(Map<String, Set<URI>> fetches) {
- this.fetchMap.clear();
- this.fetchMap.putAll(fetches);
- }
-
- public Fragment getFragment(String tableId) {
- return this.fragMap.get(tableId);
- }
-
- public Collection<Fragment> getAllFragments() {
- return fragMap.values();
- }
-
- public LogicalNode getLogicalPlan() {
- return this.plan;
- }
-
- public QueryUnitId getId() {
- return taskId;
- }
-
- public Collection<URI> getFetchHosts(String tableId) {
- return fetchMap.get(tableId);
- }
-
- public Collection<Set<URI>> getFetches() {
- return fetchMap.values();
- }
-
- public Collection<URI> getFetch(ScanNode scan) {
- return this.fetchMap.get(scan.getTableId());
- }
-
- public String getOutputName() {
- return this.store.getTableName();
- }
-
- public Schema getOutputSchema() {
- return this.store.getOutSchema();
- }
-
- public StoreTableNode getStoreTableNode() {
- return this.store;
- }
-
- public ScanNode[] getScanNodes() {
- return this.scan.toArray(new ScanNode[scan.size()]);
- }
-
- @Override
- public String toString() {
- String str = new String(plan.getType() + " \n");
- for (Entry<String, Fragment> e : fragMap.entrySet()) {
- str += e.getKey() + " : ";
- str += e.getValue() + " ";
- }
- for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
- str += e.getKey() + " : ";
- for (URI t : e.getValue()) {
- str += t + " ";
- }
- }
-
- return str;
- }
-
- public void setStats(TableStat stats) {
- this.stats = stats;
- }
-
- public void setPartitions(List<Partition> partitions) {
- this.partitions = Collections.unmodifiableList(partitions);
- }
-
- public TableStat getStats() {
- return this.stats;
- }
-
- public List<Partition> getPartitions() {
- return this.partitions;
- }
-
- public int getPartitionNum() {
- return this.partitions.size();
- }
-
- public QueryUnitAttempt newAttempt() {
- QueryUnitAttempt attempt = new QueryUnitAttempt(
- QueryIdFactory.newQueryUnitAttemptId(this.getId(),
- ++lastAttemptId), this, eventHandler);
- return attempt;
- }
-
- public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
- return attempts.get(attemptId);
- }
-
- public QueryUnitAttempt getAttempt(int attempt) {
- return this.attempts.get(new QueryUnitAttemptId(this.getId(), attempt));
- }
-
- public QueryUnitAttempt getLastAttempt() {
- return this.attempts.get(this.lastAttemptId);
- }
-
- protected QueryUnitAttempt getSuccessfulAttempt() {
- readLock.lock();
- try {
- if (null == successfulAttempt) {
- return null;
- }
- return attempts.get(successfulAttempt);
- } finally {
- readLock.unlock();
- }
- }
-
- public int getRetryCount () {
- return this.lastAttemptId;
- }
-
- private static class InitialScheduleTransition implements
- SingleArcTransition<QueryUnit, TaskEvent> {
-
- @Override
- public void transition(QueryUnit task, TaskEvent taskEvent) {
- task.addAndScheduleAttempt();
- }
- }
-
- // This is always called in the Write Lock
- private void addAndScheduleAttempt() {
- // Create new task attempt
- QueryUnitAttempt attempt = newAttempt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created attempt " + attempt.getId());
- }
- switch (attempts.size()) {
- case 0:
- attempts = Collections.singletonMap(attempt.getId(), attempt);
- break;
-
- case 1:
- Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
- = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
- newAttempts.putAll(attempts);
- attempts = newAttempts;
- attempts.put(attempt.getId(), attempt);
- break;
-
- default:
- attempts.put(attempt.getId(), attempt);
- break;
- }
-
- if (failedAttempts > 0) {
- eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
- TaskAttemptEventType.TA_RESCHEDULE));
- } else {
- eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
- TaskAttemptEventType.TA_SCHEDULE));
- }
- }
-
- private static class AttemptSucceededTransition
- implements SingleArcTransition<QueryUnit, TaskEvent>{
-
- @Override
- public void transition(QueryUnit task,
- TaskEvent event) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- QueryUnitAttempt attempt = task.attempts.get(
- attemptEvent.getTaskAttemptId());
- task.successfulAttempt = attemptEvent.getTaskAttemptId();
- task.succeededHost = attempt.getHost();
- task.succeededPullServerPort = attempt.getPullServerPort();
- task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
- SubQueryEventType.SQ_TASK_COMPLETED));
- }
- }
-
- private static class AttemptFailedTransition implements
- MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
-
- @Override
- public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
- LOG.info("=============================================================");
- LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
- LOG.info("=============================================================");
- task.failedAttempts++;
- task.finishedAttempts++;
-
- if (task.failedAttempts < task.maxAttempts) {
- if (task.successfulAttempt == null) {
- task.addAndScheduleAttempt();
- }
- } else {
- task.eventHandler.handle(
- new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
- return TaskState.FAILED;
- }
-
- return task.getState();
- }
- }
-
- @Override
- public void handle(TaskEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getTaskId() + " of type "
- + event.getType());
- }
-
- try {
- writeLock.lock();
- TaskState oldState = getState();
- try {
- stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
- eventHandler.handle(new QueryEvent(getId().getQueryId(),
- QueryEventType.INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getState()) {
- LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
- + getState());
- }
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-
- public void setIntermediateData(Collection<IntermediateEntry> partitions) {
- this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
- }
-
- public List<IntermediateEntry> getIntermediateData() {
- return this.intermediateData;
- }
-
- public static class IntermediateEntry {
- int taskId;
- int attemptId;
- int partitionId;
- String pullHost;
- int port;
-
- public IntermediateEntry(int taskId, int attemptId, int partitionId,
- String pullServerAddr, int pullServerPort) {
- this.taskId = taskId;
- this.attemptId = attemptId;
- this.partitionId = partitionId;
- this.pullHost = pullServerAddr;
- this.port = pullServerPort;
- }
-
- public int getTaskId() {
- return this.taskId;
- }
-
- public int getAttemptId() {
- return this.attemptId;
- }
-
- public int getPartitionId() {
- return this.partitionId;
- }
-
- public String getPullHost() {
- return this.pullHost;
- }
-
- public int getPullPort() {
- return port;
- }
-
- public String getPullAddress() {
- return pullHost + ":" + port;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnitAttempt.java
deleted file mode 100644
index cf95a9e..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnitAttempt.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.engine.MasterWorkerProtos.Partition;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import org.apache.tajo.master.QueryUnit.IntermediateEntry;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
-
- private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
-
- private final static int EXPIRE_TIME = 15000;
-
- private final QueryUnitAttemptId id;
- private final QueryUnit queryUnit;
- final EventHandler eventHandler;
-
- private String hostName;
- private int port;
- private int expire;
-
- private final Lock readLock;
- private final Lock writeLock;
-
- private final List<String> diagnostics = new ArrayList<String>();
-
- private static final StateMachineFactory
- <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- stateMachineFactory = new StateMachineFactory
- <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- (TaskAttemptState.TA_NEW)
-
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
- TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
- .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
- TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
-
- .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
- TaskAttemptEventType.TA_ASSIGNED, new LaunchTransition())
-
- // from assigned
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
- TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
- .addTransition(TaskAttemptState.TA_ASSIGNED,
- EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
- TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
- .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
- // from running
- .addTransition(TaskAttemptState.TA_RUNNING,
- EnumSet.of(TaskAttemptState.TA_RUNNING),
- TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
- .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_UPDATE)
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
- TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
- .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
- TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
- .installTopology();
-
- private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
- stateMachine;
-
-
- public QueryUnitAttempt(final QueryUnitAttemptId id, final QueryUnit queryUnit,
- final EventHandler eventHandler) {
- this.id = id;
- this.expire = QueryUnitAttempt.EXPIRE_TIME;
- this.queryUnit = queryUnit;
- this.eventHandler = eventHandler;
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
-
- stateMachine = stateMachineFactory.make(this);
- }
-
- public TaskAttemptState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- public QueryUnitAttemptId getId() {
- return this.id;
- }
-
- public boolean isLeafTask() {
- return this.queryUnit.isLeafTask();
- }
-
- public QueryUnit getQueryUnit() {
- return this.queryUnit;
- }
-
- public String getHost() {
- return this.hostName;
- }
-
- public void setHost(String host) {
- this.hostName = host;
- }
-
- public void setPullServerPort(int port) {
- this.port = port;
- }
-
- public int getPullServerPort() {
- return port;
- }
-
- public synchronized void setExpireTime(int expire) {
- this.expire = expire;
- }
-
- public synchronized void updateExpireTime(int period) {
- this.setExpireTime(this.expire - period);
- }
-
- public synchronized void resetExpireTime() {
- this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME);
- }
-
- public int getLeftTime() {
- return this.expire;
- }
-
- private void fillTaskStatistics(TaskCompletionReport report) {
- if (report.getPartitionsCount() > 0) {
- this.getQueryUnit().setPartitions(report.getPartitionsList());
-
- List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
- for (Partition p : report.getPartitionsList()) {
- IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
- getId().getId(), p.getPartitionKey(), getHost(), getPullServerPort());
- partitions.add(entry);
- }
- this.getQueryUnit().setIntermediateData(partitions);
- }
- if (report.hasResultStats()) {
- this.getQueryUnit().setStats(new TableStat(report.getResultStats()));
- }
- }
-
- private static class TaskAttemptScheduleTransition implements
- SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent taskAttemptEvent) {
-
- if (taskAttempt.isLeafTask()
- && taskAttempt.getQueryUnit().getScanNodes().length == 1) {
- Set<String> racks = new HashSet<String>();
- for (String host : taskAttempt.getQueryUnit().getDataLocations()) {
- racks.add(RackResolver.resolve(host).getNetworkLocation());
- }
-
- taskAttempt.eventHandler.handle(new TaskScheduleEvent(
- taskAttempt.getId(), EventType.T_SCHEDULE, true,
- taskAttempt.getQueryUnit().getDataLocations(),
- racks.toArray(new String[racks.size()])
- ));
- } else {
- taskAttempt.eventHandler.handle(new TaskScheduleEvent(
- taskAttempt.getId(), EventType.T_SCHEDULE,
- false,
- null,
- null
- ));
- }
- }
- }
-
- private static class LaunchTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
- @Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
- taskAttempt.setHost(castEvent.getHostName());
- taskAttempt.setPullServerPort(castEvent.getPullServerPort());
- taskAttempt.eventHandler.handle(
- new TaskTAttemptEvent(taskAttempt.getId(),
- TaskEventType.T_ATTEMPT_LAUNCHED));
- }
- }
-
- private static class StatusUpdateTransition
- implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
-
- @Override
- public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskAttemptStatusUpdateEvent updateEvent =
- (TaskAttemptStatusUpdateEvent) event;
-
- switch (updateEvent.getStatus().getState()) {
- case TA_PENDING:
- case TA_RUNNING:
- return TaskAttemptState.TA_RUNNING;
-
- default:
- return taskAttempt.getState();
- }
- }
- }
-
- private void addDiagnosticInfo(String diag) {
- if (diag != null && !diag.equals("")) {
- diagnostics.add(diag);
- }
- }
-
- private static class AlreadyAssignedTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
- @Override
- public void transition(QueryUnitAttempt queryUnitAttempt,
- TaskAttemptEvent taskAttemptEvent) {
- LOG.info(">>>>>>>>> Already Assigned: " + queryUnitAttempt.getId());
- }
- }
-
- private static class AlreadyDoneTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
- @Override
- public void transition(QueryUnitAttempt queryUnitAttempt,
- TaskAttemptEvent taskAttemptEvent) {
- LOG.info(">>>>>>>>> Already Done: " + queryUnitAttempt.getId());
- }
- }
-
- private static class SucceededTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
- @Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
-
- taskAttempt.fillTaskStatistics(report);
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(),
- TaskEventType.T_ATTEMPT_SUCCEEDED));
- }
- }
-
- private static class FailedTransition
- implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
- @Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent event) {
- TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
- taskAttempt.eventHandler.handle(
- new TaskTAttemptEvent(taskAttempt.getId(),
- TaskEventType.T_ATTEMPT_FAILED));
- LOG.error("FROM " + taskAttempt.getHost() + " >> "
- + errorEvent.errorMessage());
- taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
- }
- }
-
- @Override
- public void handle(TaskAttemptEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getTaskAttemptId() + " of type "
- + event.getType());
- }
- try {
- writeLock.lock();
- TaskAttemptState oldState = getState();
- try {
- stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state of "
- + event.getTaskAttemptId() + ")", e);
- eventHandler.handle(new QueryEvent(getId().getQueryId(),
- QueryEventType.INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getState()) {
- LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
- + getState());
- }
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-}