You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [3/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+public class AMPoolConfiguration extends Configuration {
+
+ private static final String AMPOOL_DEFAULT_XML_FILE =
+ "tez-ampool-default.xml";
+ private static final String AMPOOL_SITE_XML_FILE = "tez-ampool-site.xml";
+ public static final String AMPOOL_APP_XML_FILE = "tez-ampool-app.xml";
+
+ public static final String APP_NAME = "AMPoolService";
+
+ static {
+ Configuration.addDefaultResource(AMPOOL_DEFAULT_XML_FILE);
+ Configuration.addDefaultResource(AMPOOL_SITE_XML_FILE);
+ Configuration.addDefaultResource(AMPOOL_APP_XML_FILE);
+ }
+
+ //Configurations
+
+ public static final String AMPOOL_PREFIX = "tez.ampool.";
+
+ // Client
+
+ public static final String AM_MASTER_MEMORY_MB =
+ AMPOOL_PREFIX + "am.master_memory";
+ public static final int DEFAULT_AM_MASTER_MEMORY_MB = 1024;
+
+ public static final String AM_MASTER_QUEUE =
+ AMPOOL_PREFIX + "am.master_queue";
+ public static final String DEFAULT_AM_MASTER_QUEUE = "default";
+
+ // Server
+ public static final String WS_PORT =
+ AMPOOL_PREFIX + "ws.port";
+ public static final int DEFAULT_WS_PORT = 12999;
+
+ // AM
+
+ public static final String AM_POOL_SIZE =
+ AMPOOL_PREFIX + "am-pool-size";
+ public static final int DEFAULT_AM_POOL_SIZE = 3;
+
+ public static final String MAX_AM_POOL_SIZE =
+ AMPOOL_PREFIX + "max-am-pool-size";
+ public static final int DEFAULT_MAX_AM_POOL_SIZE = 5;
+
+ public static final String AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION =
+ AMPOOL_PREFIX + "launch-new-am-after-app-completion";
+ public static final boolean DEFAULT_AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION =
+ true;
+
+ public static final String MAX_AM_LAUNCH_FAILURES =
+ AMPOOL_PREFIX + "max-am-launch-failures";
+ public static final int DEFAULT_MAX_AM_LAUNCH_FAILURES = 10;
+
+ public static final String AM_STAGING_DIR =
+ AMPOOL_PREFIX + "am.staging-dir";
+ public static final String DEFAULT_AM_STAGING_DIR =
+ "/tmp/tez/ampool/staging/";
+
+ // RM Client proxy
+
+ public static final String RM_PROXY_CLIENT_THREAD_COUNT =
+ AMPOOL_PREFIX + "rm-proxy-client.thread-count";
+ public static final int DEFAULT_RM_PROXY_CLIENT_THREAD_COUNT = 10;
+
+ public static final String RM_PROXY_CLIENT_ADDRESS =
+ AMPOOL_PREFIX + "address";
+ public static final int DEFAULT_RM_PROXY_CLIENT_PORT = 10030;
+ public static final String DEFAULT_RM_PROXY_CLIENT_ADDRESS = "0.0.0.0:" +
+ DEFAULT_RM_PROXY_CLIENT_PORT;
+
+ // MR AM related
+
+ // Memory to allocate for lazy AM
+ public static final String MR_AM_MEMORY_ALLOCATION_MB =
+ AMPOOL_PREFIX + "mr-am.memory-allocation-mb";
+ public static final int DEFAULT_MR_AM_MEMORY_ALLOCATION_MB = 1536;
+
+ // Queue to launch LazyMRAM
+ public static final String MR_AM_QUEUE_NAME =
+ AMPOOL_PREFIX + "mr-am.queue-name";
+
+ public static final String MR_AM_JOB_JAR_PATH =
+ AMPOOL_PREFIX + "mr-am.job-jar-path";
+ public static final String DEFAULT_MR_AM_JOB_JAR_PATH =
+ "";
+
+ public static final String APPLICATION_MASTER_CLASS =
+ AMPOOL_PREFIX + "mr-am.application-master-class";
+ public static final String DEFAULT_APPLICATION_MASTER_CLASS =
+ "org.apache.hadoop.mapreduce.v2.app2.lazy.LazyMRAppMaster";
+
+ public static final String LAZY_AM_POLLING_URL_ENV =
+ "LAZY_AM_POLLING_URL";
+
+ public static final String TMP_DIR_PATH =
+ AMPOOL_PREFIX + "tmp-dir-path";
+ public static final String DEFAULT_TMP_DIR_PATH = "/tmp/ampoolservice/";
+
+ public static final String LAZY_AM_CONF_FILE_PATH =
+ AMPOOL_PREFIX + "lazy-am-conf-file-path";
+
+ /**
+ * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
+ * entries
+ */
+ public static final String AMPOOL_APPLICATION_CLASSPATH = AMPOOL_PREFIX
+ + "application.classpath";
+
+ /**
+ * Default CLASSPATH for YARN applications. A comma-separated list of
+ * CLASSPATH entries
+ */
+ public static final String[] DEFAULT_AMPOOL_APPLICATION_CLASSPATH = {
+ ApplicationConstants.Environment.HADOOP_CONF_DIR.$(),
+ ApplicationConstants.Environment.HADOOP_COMMON_HOME.$()
+ + "/share/hadoop/common/*",
+ ApplicationConstants.Environment.HADOOP_COMMON_HOME.$()
+ + "/share/hadoop/common/lib/*",
+ ApplicationConstants.Environment.HADOOP_HDFS_HOME.$()
+ + "/share/hadoop/hdfs/*",
+ ApplicationConstants.Environment.HADOOP_HDFS_HOME.$()
+ + "/share/hadoop/hdfs/lib/*",
+ ApplicationConstants.Environment.HADOOP_YARN_HOME.$()
+ + "/share/hadoop/yarn/*",
+ ApplicationConstants.Environment.HADOOP_YARN_HOME.$()
+ + "/share/hadoop/yarn/lib/*"
+ };
+
+ public AMPoolConfiguration() {
+ super();
+ }
+
+ public AMPoolConfiguration(Configuration conf) {
+ super(conf);
+ if (! (conf instanceof AMPoolConfiguration)) {
+ this.reloadConfiguration();
+ }
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.ampool.manager.AMPoolManager;
+
+public class AMPoolContext {
+
+ private final Dispatcher dispatcher;
+
+ private final YarnClient rmYarnClient;
+
+ private final AMRMClient amRMClient;
+
+ private final AMPoolManager amPoolManager;
+
+ private Resource minResourceCapability;
+ private Resource maxResourceCapability;
+
+ private final String nmHost;
+
+ public AMPoolContext(Configuration conf, String nmHost,
+ Dispatcher dispatcher, YarnClient yarnClient, AMRMClient amRMClient,
+ AMPoolManager amPoolManager) {
+ // TODO
+ this.dispatcher = dispatcher;
+ this.rmYarnClient = yarnClient;
+ this.amRMClient = amRMClient;
+ this.amPoolManager = amPoolManager;
+ this.nmHost = nmHost;
+ }
+
+ public ApplicationId getNewApplicationId() {
+ return amPoolManager.getAM();
+ }
+
+ public YarnClient getRMYarnClient() {
+ return rmYarnClient;
+ }
+
+ public AMContext getAMContext(ApplicationId applicationId) {
+ return amPoolManager.getAMContext(applicationId);
+ }
+
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) {
+ amPoolManager.submitApplication(request);
+ SubmitApplicationResponse response =
+ Records.newRecord(SubmitApplicationResponse.class);
+ return response;
+ }
+
+ public AMRMClient getAMRMClient() {
+ return amRMClient;
+ }
+
+ public synchronized Resource getMaxResourceCapability() {
+ return maxResourceCapability;
+ }
+
+ public synchronized Resource getMinResourceCapability() {
+ return minResourceCapability;
+ }
+
+ public synchronized void setMaxResourceCapability(Resource resource) {
+ maxResourceCapability = resource;
+ }
+
+ public synchronized void setMinResourceCapability(Resource resource) {
+ minResourceCapability = resource;
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public boolean isManagedApp(ApplicationId applicationId) {
+ return amPoolManager.isManagedApp(applicationId);
+ }
+
+ public String getNMHost() {
+ return nmHost;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.ampool.manager.AMPoolEventType;
+import org.apache.tez.ampool.manager.AMPoolManager;
+import org.apache.tez.ampool.rest.AMPoolStatusService;
+import org.apache.tez.ampool.rest.ApplicationPollService;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+
+import com.sun.jersey.spi.container.servlet.ServletContainer;
+
+public class AMPoolService extends CompositeService {
+
+ private static final Log LOG = LogFactory.getLog(AMPoolService.class);
+
+ private final long startTime = System.currentTimeMillis();
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ private AMPoolContext context;
+ private AMPoolClientRMProxy clientRMProxy;
+ private AMPoolManager amPoolManager;
+ private RMHeartbeatService rmHeartbeatService;
+ private AMMonitorService amMonitorService;
+ private Server webAppServer = null;
+ private Configuration conf;
+
+ private YarnClient yarnRMClient;
+ private AMRMClient amRMClient;
+
+ private Dispatcher dispatcher;
+
+ private final ApplicationAttemptId applicationAttemptId;
+ private final ContainerId containerId;
+ private final String nmHost;
+ private final int nmPort;
+ private int webAppServerPort;
+ private String trackerUrl;
+
+ private final boolean inCLIMode;
+
+ public String tmpAppDirPath = null;
+
+ private FileSystem localFs;
+
+ private static final Options opts;
+
+ static {
+ opts = new Options();
+ opts.addOption("cli", false, "Run via command-line in non-AM mode");
+ }
+
+ public AMPoolService(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String nmHost, int nmPort,
+ boolean inCLIMode) {
+ super(AMPoolService.class.getName());
+ this.applicationAttemptId = applicationAttemptId;
+ this.containerId = containerId;
+ this.nmHost = nmHost;
+ this.nmPort = nmPort;
+ this.inCLIMode = inCLIMode;
+ }
+
+ public AMPoolService(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String nmHost, int nmPort) {
+ this(applicationAttemptId, containerId, nmHost, nmPort, false);
+ LOG.info("Created AMPoolService"
+ + ", applicationAttemptId=" + applicationAttemptId
+ + ", containerId=" + containerId
+ + ", nmHost=" + nmHost
+ + ", nmPort=" + nmPort
+ + ", launchTime=" + startTime
+ + ", inCLIMode=" + inCLIMode);
+ }
+
+ public AMPoolService(String localhost) {
+ this(null, null, localhost, -1, true);
+ LOG.info("Created AMPoolService"
+ + ", host=" + nmHost
+ + ", launchTime=" + startTime
+ + ", inCLIMode=" + inCLIMode);
+ }
+
+ @Override
+ public void init(final Configuration conf) {
+ this.conf = conf;
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ dispatcher = new AsyncDispatcher();
+ addIfService(dispatcher);
+
+ yarnRMClient = new YarnClientImpl();
+ addIfService(yarnRMClient);
+
+ if (inCLIMode) {
+ amRMClient = null;
+ } else {
+ amRMClient = new AMRMClientImpl(applicationAttemptId);
+ addIfService(amRMClient);
+ }
+
+ tmpAppDirPath = conf.get(AMPoolConfiguration.TMP_DIR_PATH,
+ AMPoolConfiguration.DEFAULT_TMP_DIR_PATH)
+ + File.separator;
+
+ amPoolManager = new AMPoolManager(conf, dispatcher, yarnRMClient, nmHost,
+ inCLIMode, tmpAppDirPath);
+ dispatcher.register(AMPoolEventType.class, amPoolManager);
+ addIfService(amPoolManager);
+
+ context = new AMPoolContext(conf, nmHost, dispatcher,
+ yarnRMClient, amRMClient, amPoolManager);
+
+ amMonitorService = new AMMonitorService(context);
+ dispatcher.register(AMMonitorEventType.class, amMonitorService);
+ addIfService(amMonitorService);
+
+ clientRMProxy = new AMPoolClientRMProxy(context);
+ addIfService(clientRMProxy);
+
+ webAppServerPort = conf.getInt(AMPoolConfiguration.WS_PORT,
+ AMPoolConfiguration.DEFAULT_WS_PORT);
+ trackerUrl = "http://" + nmHost
+ + ":" + webAppServerPort
+ + "/master/status";
+
+ if (!inCLIMode) {
+ rmHeartbeatService = new RMHeartbeatService(context, trackerUrl,
+ webAppServerPort);
+ addIfService(rmHeartbeatService);
+ }
+
+ webAppServer = new Server(webAppServerPort);
+ webAppServer.setThreadPool(new QueuedThreadPool(50));
+ webAppServer.setStopAtShutdown(true);
+
+ ServletContextHandler root = new ServletContextHandler(webAppServer, "/");
+ ServletHolder rootServlet = root.addServlet(DefaultServlet.class, "/");
+ rootServlet.setInitOrder(1);
+
+ ServletHolder restHandler = new ServletHolder(ServletContainer.class);
+ restHandler.setInitParameter(
+ "com.sun.jersey.config.property.resourceConfigClass",
+ "com.sun.jersey.api.core.PackagesResourceConfig");
+ restHandler.setInitParameter(
+ "com.sun.jersey.config.property.packages",
+ "org.apache.tez.ampool.rest");
+ restHandler.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
+ "true");
+ root.addServlet(restHandler, "/*");
+ restHandler.setInitOrder(2);
+
+ ApplicationPollService.init(amPoolManager);
+ AMPoolStatusService.init(amPoolManager);
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ if (!inCLIMode) {
+ LOG.debug("Starting AMPoolService"
+ + ", applicationAttemptId=" + applicationAttemptId
+ + ", containerId=" + containerId
+ + ", nmHost=" + nmHost
+ + ", nmPort=" + nmPort
+ + ", launchTime=" + startTime
+ + ", startingTime=" + System.currentTimeMillis()
+ + ", inCLIMode=" + inCLIMode);
+ }
+ super.start();
+ try {
+ webAppServer.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (!inCLIMode) {
+ LOG.info("Started AMPoolService"
+ + ", applicationAttemptId=" + applicationAttemptId
+ + ", containerId=" + containerId
+ + ", nmHost=" + nmHost
+ + ", nmPort=" + nmPort
+ + ", launchTime=" + startTime
+ + ", startedTime=" + System.currentTimeMillis()
+ + ", inCLIMode=" + inCLIMode);
+ } else {
+ LOG.info("Started AMPoolService. Use " + trackerUrl
+ + " to monitor the AMPoolService");
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (webAppServer != null) {
+ try {
+ webAppServer.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ super.stop();
+ }
+
+ protected void addIfService(Object object) {
+ if (object instanceof Service) {
+ addService((Service) object);
+ }
+ }
+
+ private static void validateInputParam(String value, String param)
+ throws IOException {
+ if (value == null) {
+ String msg = param + " is null";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ static class AMPoolServiceShutdownHook implements Runnable {
+ AMPoolService appMaster;
+ AMPoolServiceShutdownHook(AMPoolService appMaster) {
+ this.appMaster = appMaster;
+ }
+ public void run() {
+ LOG.info("AMPoolService received a signal. Shutting down now.");
+ Configuration conf = appMaster.conf;
+ appMaster.stop();
+ // TODO fix - this is a problem with multiple attempts of this AM
+ // Client does not remain running till end of AM
+ FileSystem fs;
+ try {
+ if (!appMaster.inCLIMode) {
+ fs = FileSystem.get(conf);
+ Path appDirOnDFS = new Path(
+ fs.getHomeDirectory(),
+ AMPoolConfiguration.APP_NAME + "/"
+ + appMaster.applicationAttemptId.getApplicationId().toString()
+ + "/");
+ LOG.info("Deleting app dir on dfs"
+ + ", path=" + appDirOnDFS.toString());
+ fs.delete(appDirOnDFS, true);
+ } else {
+ if (appMaster.tmpAppDirPath != null) {
+ LOG.info("Deleting working dir: " + appMaster.tmpAppDirPath);
+ appMaster.localFs.delete(new Path(appMaster.tmpAppDirPath), true);
+ }
+ }
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ private static AMPoolService initNonCLIAM(Configuration conf)
+ throws IOException {
+ String containerIdStr =
+ System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+ String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+
+ validateInputParam(containerIdStr,
+ ApplicationConstants.AM_CONTAINER_ID_ENV);
+ validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+ validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId applicationAttemptId =
+ containerId.getApplicationAttemptId();
+
+ AMPoolService appMaster =
+ new AMPoolService(applicationAttemptId, containerId, nodeHostString,
+ Integer.parseInt(nodePortString));
+ return appMaster;
+ }
+
+ public static void main(String[] args) throws IOException, ParseException {
+
+ Configuration conf = new AMPoolConfiguration(new YarnConfiguration());
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ AMPoolService appMaster = null;
+
+ String localhost = InetAddress.getLocalHost().getCanonicalHostName();
+
+ if (!cliParser.hasOption("cli")) {
+ appMaster = initNonCLIAM(conf);
+ } else {
+ appMaster = new AMPoolService(localhost);
+ }
+
+ ShutdownHookManager.get().addShutdownHook(
+ new AMPoolServiceShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
+
+ appMaster.init(conf);
+ appMaster.start();
+
+ }
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class RMHeartbeatService extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(RMHeartbeatService.class);
+
+ private long heartbeatInterval = 1000l;
+ Timer scheduleTimer;
+ RMAllocateTimerTask rmAllocateTimerTask;
+ private final AMRMClient amRmClient;
+ private final AMPoolContext context;
+ private final String trackerUrl;
+ private final int trackerPort;
+
+ public RMHeartbeatService(AMPoolContext context, String trackerUrl,
+ int trackerPort) {
+ super(RMHeartbeatService.class.getName());
+ // TODO Auto-generated constructor stub
+ this.context = context;
+ this.amRmClient = context.getAMRMClient();
+ this.trackerUrl = trackerUrl;
+ this.trackerPort = trackerPort;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ try {
+ LOG.info("Registering AMPoolService with RM"
+ + ", nmHost=" + context.getNMHost()
+ + ", trackerPort=" + trackerPort
+ + ", trackerUrl=" + trackerUrl);
+ RegisterApplicationMasterResponse response =
+ amRmClient.registerApplicationMaster(
+ context.getNMHost(), trackerPort, trackerUrl);
+ context.setMinResourceCapability(response.getMinimumResourceCapability());
+ context.setMaxResourceCapability(response.getMaximumResourceCapability());
+ } catch (YarnRemoteException e) {
+ throw new RuntimeException(e);
+ }
+
+ scheduleTimer = new Timer("RMAllocateTimer", true);
+ rmAllocateTimerTask = new RMAllocateTimerTask();
+ scheduleTimer.scheduleAtFixedRate(rmAllocateTimerTask, heartbeatInterval,
+ heartbeatInterval);
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (rmAllocateTimerTask != null) {
+ rmAllocateTimerTask.stop();
+ }
+ }
+
+ private class RMAllocateTimerTask extends TimerTask {
+ private volatile boolean shouldRun = true;
+
+ @Override
+ public void run() {
+ if (shouldRun) {
+ try {
+ amRmClient.allocate(0);
+ } catch (YarnRemoteException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ this.cancel();
+ }
+ }
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.client;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyAMConfig;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.ampool.AMPoolService;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class AMPoolClient extends YarnClientImpl {
+
+ private static final Log LOG = LogFactory.getLog(AMPoolClient.class);
+
+ // Configuration
+ private Configuration conf;
+
+ private Options opts;
+
+ private String appMasterJar;
+
+ private final String appName = AMPoolConfiguration.APP_NAME;
+ private static final String appMasterMainClass = AMPoolService.class.getName();
+
+ private String tmpAppDirPath = null;
+
+ public AMPoolClient() {
+ opts = new Options();
+ opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
+ opts.addOption("jar", true, "Jar file containing the application master");
+ opts.addOption("help", false, "Print usage");
+ }
+
+ /**
+ * @param args Command line arguments
+ */
+ public static void main(String[] args) {
+ boolean result = false;
+ AMPoolClient amPoolClient = null;
+ try {
+ amPoolClient = new AMPoolClient();
+ LOG.info("Initializing AMPoolClient");
+ try {
+ boolean doRun = amPoolClient.init(args);
+ if (!doRun) {
+ System.exit(0);
+ }
+ } catch (IllegalArgumentException e) {
+ System.err.println(e.getLocalizedMessage());
+ amPoolClient.printUsage();
+ System.exit(-1);
+ }
+ result = amPoolClient.run();
+ } catch (Throwable t) {
+ LOG.fatal("Error running CLient", t);
+ System.exit(1);
+ } finally {
+ if (amPoolClient != null
+ && amPoolClient.tmpAppDirPath != null) {
+ LOG.info("Deleting working dir: " + amPoolClient.tmpAppDirPath);
+ File dir = new File(amPoolClient.tmpAppDirPath);
+ for (File f : dir.listFiles()) {
+ f.delete();
+ }
+ dir.delete();
+ }
+ }
+ if (result) {
+ LOG.info("Application completed successfully");
+ System.exit(0);
+ }
+ LOG.error("Application failed to complete successfully");
+ System.exit(2);
+ }
+
+ public boolean init(String[] args) throws Exception {
+ this.conf = new AMPoolConfiguration(new YarnConfiguration());
+ // TODO Auto-generated method stub
+ super.init(conf);
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (args.length == 0) {
+ throw new IllegalArgumentException("No args specified");
+ }
+
+ if (cliParser.hasOption("help")) {
+ printUsage();
+ return false;
+ }
+
+ if (!cliParser.hasOption("jar")) {
+ throw new IllegalArgumentException("No jar file specified for application master");
+ }
+
+
+ if (!cliParser.hasOption("jar")) {
+ throw new IllegalArgumentException(
+ "No jar file specified for application master");
+ }
+
+ appMasterJar = cliParser.getOptionValue("jar");
+
+ return true;
+ }
+
+ private void printUsage() {
+ // TODO Auto-generated method stub
+ }
+
+ public boolean run() throws IOException {
+ LOG.info("Running AMPoolClient");
+ start();
+
+ // Get a new application id
+ GetNewApplicationResponse newApp = super.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+ System.out.println("Starting AMPoolMaster with Application ID: "
+ + appId);
+
+ int minMem = newApp.getMinimumResourceCapability().getMemory();
+ int maxMem = newApp.getMaximumResourceCapability().getMemory();
+ LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ int amMemory = conf.getInt(AMPoolConfiguration.AM_MASTER_MEMORY_MB,
+ AMPoolConfiguration.DEFAULT_AM_MASTER_MEMORY_MB);
+
+ // Create launch context for app master
+ LOG.info("Setting up application submission context for ASM");
+ ApplicationSubmissionContext appContext =
+ Records.newRecord(ApplicationSubmissionContext.class);
+
+ appContext.setApplicationId(appId);
+ appContext.setApplicationName(appName);
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(
+ ContainerLaunchContext.class);
+
+ // set local resources for the application master
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ LOG.info("Copy App Master jar from local filesystem"
+ + " and add to local environment");
+ FileSystem fs = FileSystem.get(conf);
+ Path src = new Path(appMasterJar);
+ String pathSuffix = appName + "/" + appId.toString() + "/AMPoolService.jar";
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ fs.copyFromLocalFile(false, true, src, dst);
+ FileStatus destStatus = fs.getFileStatus(dst);
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+
+ amJarRsrc.setType(LocalResourceType.FILE);
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ amJarRsrc.setTimestamp(destStatus.getModificationTime());
+ amJarRsrc.setSize(destStatus.getLen());
+ localResources.put("AMPoolService.jar", amJarRsrc);
+
+ tmpAppDirPath = conf.get(AMPoolConfiguration.TMP_DIR_PATH,
+ AMPoolConfiguration.DEFAULT_TMP_DIR_PATH)
+ + File.separator + appId.toString() + File.separator;
+ File appDir = new File(tmpAppDirPath);
+ appDir.deleteOnExit();
+
+ LOG.info("Writing contents of LazyAMConfig to local fs");
+
+ Configuration lazyAMConfToUpload = new LazyAMConfig();
+ File lazyAmConfFile = new File(tmpAppDirPath,
+ LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+ LOG.info("Writing contents of LazyAMConfig to local fs"
+ + ", path=" + lazyAmConfFile.getAbsolutePath());
+ lazyAmConfFile.getParentFile().mkdirs();
+ lazyAMConfToUpload.writeXml(new FileOutputStream(lazyAmConfFile));
+
+ Path lazyAmConfPathOnDfs = new Path(
+ fs.getHomeDirectory(),
+ appName + "/" + appId.toString() + "/"
+ + LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+
+ LOG.info("Uploading contents of LazyAMConfig to dfs"
+ + ", srcpath=" + lazyAmConfFile.getAbsolutePath()
+ + ", destpath=" + lazyAmConfPathOnDfs.toString());
+ fs.copyFromLocalFile(false, true,
+ new Path(lazyAmConfFile.getAbsolutePath()), lazyAmConfPathOnDfs);
+ fs.getFileStatus(lazyAmConfPathOnDfs);
+
+ Configuration amPoolConfToUpload = new AMPoolConfiguration();
+ amPoolConfToUpload.set(
+ AMPoolConfiguration.LAZY_AM_CONF_FILE_PATH,
+ lazyAmConfPathOnDfs.toString());
+
+ File amPoolConfFile = new File(tmpAppDirPath,
+ AMPoolConfiguration.AMPOOL_APP_XML_FILE);
+ amPoolConfFile.getParentFile().mkdirs();
+ LOG.info("Writing contents of AmPoolConf to local fs"
+ + ", path=" + amPoolConfFile.getAbsolutePath());
+ amPoolConfToUpload.writeXml(new FileOutputStream(amPoolConfFile));
+
+ Path amPoolConfPathOnDfs = new Path(
+ fs.getHomeDirectory(),
+ appName + "/" + appId.toString() + "/"
+ + AMPoolConfiguration.AMPOOL_APP_XML_FILE);
+ LOG.info("Uploading contents of AmPoolConfig to dfs"
+ + ", srcpath=" + amPoolConfFile.getAbsolutePath()
+ + ", destpath=" + amPoolConfPathOnDfs.toString());
+ fs.copyFromLocalFile(false, true,
+ new Path(amPoolConfFile.getAbsolutePath()),
+ amPoolConfPathOnDfs);
+ FileStatus amPoolConfFileStatus =
+ fs.getFileStatus(amPoolConfPathOnDfs);
+
+ LocalResource amPoolConfRsrc = Records.newRecord(LocalResource.class);
+ amPoolConfRsrc.setType(LocalResourceType.FILE);
+ amPoolConfRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ amPoolConfRsrc.setResource(ConverterUtils.getYarnUrlFromPath(
+ amPoolConfPathOnDfs));
+ amPoolConfRsrc.setTimestamp(amPoolConfFileStatus.getModificationTime());
+ amPoolConfRsrc.setSize(amPoolConfFileStatus.getLen());
+ LOG.info("Adding AMPoolConfig as a local resource for AMPoolServiceAM");
+ localResources.put(AMPoolConfiguration.AMPOOL_APP_XML_FILE,
+ amPoolConfRsrc);
+
+ amContainer.setLocalResources(localResources);
+
+ // Set the env variables to be setup in the env
+ // where the application master will be run
+ LOG.info("Set the environment for the application master");
+ Map<String, String> env = new HashMap<String, String>();
+
+ StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:.");
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+
+ for (String c : conf.getStrings(
+ MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+
+ for (String c : conf.getStrings(
+ AMPoolConfiguration.AMPOOL_APPLICATION_CLASSPATH,
+ AMPoolConfiguration.DEFAULT_AMPOOL_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+
+ // add the runtime classpath needed for tests to work
+ if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ classPathEnv.append(':');
+ classPathEnv.append(System.getProperty("java.class.path"));
+ }
+
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ amContainer.setEnvironment(env);
+
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ LOG.info("Setting up app master command");
+ vargs.add("${JAVA_HOME}" + "/bin/java");
+ // Set Xmx based on am memory size
+ int normalizeMem = (int)(amMemory * 0.75);
+ vargs.add("-Xmx" + normalizeMem + "m");
+ // Set class name
+ vargs.add(appMasterMainClass);
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/AMPoolService.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/AMPoolService.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up app master command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ amContainer.setCommands(commands);
+
+ // Set up resource type requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(amMemory);
+ amContainer.setResource(capability);
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set the queue to which this application is to be submitted in the RM
+ String amQueue = conf.get(AMPoolConfiguration.AM_MASTER_QUEUE,
+ AMPoolConfiguration.DEFAULT_AM_MASTER_QUEUE);
+ appContext.setQueue(amQueue);
+
+ // Submit the application to the applications manager
+ LOG.info("Submitting application to ASM");
+ super.submitApplication(appContext);
+
+ LOG.info("Monitoring application, appId="
+ + appId);
+ while (true) {
+ ApplicationReport report = super.getApplicationReport(appId);
+ if (report.getFinalApplicationStatus().equals(
+ FinalApplicationStatus.FAILED)
+ || report.getFinalApplicationStatus().equals(
+ FinalApplicationStatus.KILLED)) {
+ LOG.fatal("AMPoolService failed to start up");
+ return false;
+ }
+ else if (report.getYarnApplicationState().equals(
+ YarnApplicationState.RUNNING)) {
+ LOG.info("AMPoolService running on"
+ + ", host=" + report.getHost()
+ + ", trackingUrl=" + report.getTrackingUrl());
+ break;
+ }
+ else if (report.getFinalApplicationStatus().equals(
+ FinalApplicationStatus.SUCCEEDED)) {
+ LOG.warn("AMPoolService shutdown");
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+
+public class AMFinishedEvent extends AMPoolEvent {
+
+ private final ApplicationId applicationId;
+ private final FinalApplicationStatus finalApplicationStatus;
+
+ public AMFinishedEvent(ApplicationId applicationId,
+ FinalApplicationStatus finalApplicationStatus) {
+ super(AMPoolEventType.AM_FINISHED);
+ this.applicationId = applicationId;
+ this.finalApplicationStatus = finalApplicationStatus;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return finalApplicationStatus;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class AMLaunchedEvent extends AMPoolEvent {
+
+ private final ApplicationId applicationId;
+
+ public AMLaunchedEvent(ApplicationId applicationId) {
+ super(AMPoolEventType.AM_LAUNCHED);
+ this.applicationId = applicationId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMPoolEvent extends AbstractEvent<AMPoolEventType> {
+
+ public AMPoolEvent(AMPoolEventType type) {
+ super(type);
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+public enum AMPoolEventType {
+ LAUNCH_AM,
+ AM_FINISHED,
+ AM_LAUNCHED
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tez.ampool.AMContext;
+import org.apache.tez.ampool.AMLauncher;
+import org.apache.tez.ampool.AMMonitorEvent;
+import org.apache.tez.ampool.AMMonitorEventType;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.ampool.mr.MRAMLauncher;
+
+public class AMPoolManager extends CompositeService
+ implements EventHandler<AMPoolEvent> {
+
+ private static final Log LOG = LogFactory.getLog(AMPoolManager.class);
+
+ private AMLauncher amLauncher;
+
+ private final Dispatcher dispatcher;
+
+ private int numAMs;
+
+ private int maxNumAMs;
+
+ private AtomicInteger pendingLaunches;
+ private AtomicInteger failedLaunches;
+
+ private ConcurrentHashMap<ApplicationId, AMContext> appContexts;
+
+ private LinkedList<AMContext> unassignedAppContexts;
+
+ private YarnClient yarnClient;
+ private final String nmHost;
+
+ private String pollingUrl;
+ private boolean launchAMOnCompletion = true;
+ private int maxLaunchFailures = 100;
+ private final String lazyAMConfigPath;
+ private final String tmpAppDirPath;
+ private final boolean inCLIMode;
+
+ public AMPoolManager(Configuration conf, Dispatcher dispatcher, YarnClient yarnClient,
+ String nmHost, boolean inCLIMode, String tmpAppDirPath) {
+ super(AMPoolManager.class.getName());
+ this.dispatcher = dispatcher;
+ this.nmHost = nmHost;
+ appContexts = new ConcurrentHashMap<ApplicationId, AMContext>();
+ unassignedAppContexts = new LinkedList<AMContext>();
+ pendingLaunches = new AtomicInteger();
+ failedLaunches = new AtomicInteger();
+ this.yarnClient = yarnClient;
+ this.lazyAMConfigPath = conf.get(
+ AMPoolConfiguration.LAZY_AM_CONF_FILE_PATH);
+ this.inCLIMode = inCLIMode;
+ this.tmpAppDirPath = tmpAppDirPath;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ LOG.info("Initializing AMPoolManager");
+ numAMs = conf.getInt(AMPoolConfiguration.AM_POOL_SIZE,
+ AMPoolConfiguration.DEFAULT_AM_POOL_SIZE);
+ maxNumAMs = conf.getInt(AMPoolConfiguration.MAX_AM_POOL_SIZE,
+ AMPoolConfiguration.DEFAULT_MAX_AM_POOL_SIZE);
+ launchAMOnCompletion = conf.getBoolean(
+ AMPoolConfiguration.AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION,
+ AMPoolConfiguration.DEFAULT_AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION);
+ maxLaunchFailures = conf.getInt(
+ AMPoolConfiguration.MAX_AM_LAUNCH_FAILURES,
+ AMPoolConfiguration.DEFAULT_MAX_AM_LAUNCH_FAILURES);
+
+ this.pollingUrl = "http://" + nmHost
+ + ":" + conf.getInt(AMPoolConfiguration.WS_PORT,
+ AMPoolConfiguration.DEFAULT_WS_PORT)
+ + "/applications/poll/";
+
+ LOG.info("Initializing AMPoolManager"
+ + ", poolSize=" + numAMs
+ + ", maxPoolSize=" + maxNumAMs
+ + ", launchAMOnCompletion=" + launchAMOnCompletion
+ + ", pollingUrlForAMs=" + pollingUrl
+ + ", lazyMRAMConfig=" + this.lazyAMConfigPath);
+
+ if (numAMs <= 0
+ || maxNumAMs <= 0
+ || numAMs > maxNumAMs
+ || maxLaunchFailures < 0) {
+ throw new IllegalArgumentException("Invalid configuration values"
+ + " specified"
+ + ", poolSize=" + numAMs
+ + ", maxPoolSize=" + maxNumAMs
+ + ", maxLaunchFailures" + maxLaunchFailures);
+ }
+
+ try {
+ amLauncher = new MRAMLauncher(conf, dispatcher, yarnClient,
+ pollingUrl, lazyAMConfigPath, inCLIMode, tmpAppDirPath);
+ if (amLauncher instanceof Service) {
+ addService((Service) amLauncher);
+ }
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new RuntimeException("Could not initialize launcher");
+ }
+ super.init(conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void start() {
+ LOG.info("Starting AMPoolManager");
+ super.start();
+ for (int i = 0; i < numAMs; ++i) {
+ AMPoolEvent e = new AMPoolEvent(AMPoolEventType.LAUNCH_AM);
+ this.dispatcher.getEventHandler().handle(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping AMPoolManager");
+ super.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized ApplicationId getAM() {
+ if (unassignedAppContexts.isEmpty()) {
+ LOG.warn("Did not find any unassigned AMs to allocate for new job"
+ + ", pendingLaunchCount=" + pendingLaunches.get()
+ + ", currentApplicationCount=" + appContexts.size()
+ + ", unassignedAMsCount=" + unassignedAppContexts.size());
+ return null;
+ }
+ AMContext popped = unassignedAppContexts.removeFirst();
+ LOG.info("Assigning new application id to new application"
+ + ", applicationId=" + popped.getApplicationId());
+ appContexts.put(popped.getApplicationId(), popped);
+ if (!launchAMOnCompletion) {
+ LOG.info("Launching new AM as assigned app to new application"
+ + ", assignedAppId=" + popped.getApplicationId());
+ AMPoolEvent e = new AMPoolEvent(AMPoolEventType.LAUNCH_AM);
+ this.dispatcher.getEventHandler().handle(e);
+ }
+ return popped.getApplicationId();
+ }
+
+ public synchronized void killAM(ApplicationId applicationId) {
+ // TODO Auto-generated method stub
+ }
+
+ public synchronized void submitApplication(SubmitApplicationRequest request) {
+ LOG.info("Received an ApplicationSubmissionContext from client for"
+ + " application="
+ + request.getApplicationSubmissionContext().getApplicationId()
+ .toString());
+ AMContext amContext = appContexts.get(
+ request.getApplicationSubmissionContext().getApplicationId());
+ amContext.setSubmissionContext(request.getApplicationSubmissionContext());
+ amContext.setApplicationSubmissionTime(System.currentTimeMillis());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized void handle(AMPoolEvent event) {
+ switch (event.getType()) {
+ case LAUNCH_AM:
+ // TODO
+ if ((pendingLaunches.get() +
+ appContexts.size() +
+ unassignedAppContexts.size()) >= maxNumAMs) {
+ LOG.info("Hitting AM pool limits, ignoring launch event"
+ + ", pendingLaunchCount=" + pendingLaunches.get()
+ + ", currentApplicationCount=" + appContexts.size()
+ + ", unassignedAMsCount=" + unassignedAppContexts.size());
+ break;
+ }
+ pendingLaunches.incrementAndGet();
+ amLauncher.launchAM();
+ break;
+ case AM_FINISHED:
+ // TODO
+ // if unassigned we should launch a new AM
+ AMFinishedEvent fEvent = (AMFinishedEvent) event;
+ ApplicationId appId = fEvent.getApplicationId();
+ boolean isManaged = isManagedApp(appId);
+ LOG.info("Received an AM finished event for application"
+ + ", application=" + appId
+ + ", isManaged=" + isManaged);
+ if (!isManaged) {
+ if (!fEvent.getFinalApplicationStatus().equals(
+ FinalApplicationStatus.SUCCEEDED)) {
+ int failures = failedLaunches.incrementAndGet();
+ LOG.info("Unassigned AM failed."
+ + ", totalFailedCount=" + failures);
+ if (failures > maxLaunchFailures) {
+ throw new RuntimeException("Getting too many failed launches"
+ + ", failure count exceeded " + maxLaunchFailures
+ + ", exiting");
+ }
+ }
+ }
+ removeAppId(appId);
+ int currentAMCount = pendingLaunches.get() +
+ appContexts.size() +
+ unassignedAppContexts.size();
+ if (launchAMOnCompletion
+ || !isManaged
+ || currentAMCount < numAMs) {
+ LOG.info("Launching new AM on receiving finish event"
+ + ", finishedAppId=" + appId
+ + ", launchOnCompletion=" + launchAMOnCompletion
+ + ", finishedAMWasManaged=" + isManaged
+ + ", currentAMCount=" + currentAMCount
+ + ", minAMCount=" + numAMs);
+ AMPoolEvent launchAMEvent =
+ new AMPoolEvent(AMPoolEventType.LAUNCH_AM);
+ this.dispatcher.getEventHandler().handle(launchAMEvent);
+ }
+ break;
+ case AM_LAUNCHED:
+ AMLaunchedEvent e = (AMLaunchedEvent) event;
+ AMContext amContext = new AMContext(e.getApplicationId());
+ unassignedAppContexts.add(amContext);
+ int currentCount = pendingLaunches.decrementAndGet();
+ LOG.info("AM launched for applicationId="
+ + e.getApplicationId()
+ + ", currentPending=" + currentCount);
+ AMMonitorEvent mEvent = new AMMonitorEvent(
+ AMMonitorEventType.AM_MONITOR_START,
+ e.getApplicationId());
+ this.dispatcher.getEventHandler().handle(mEvent);
+ }
+
+ }
+
+ public synchronized boolean isManagedApp(ApplicationId applicationId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Managed app contexts size=" + appContexts.size());
+ for (ApplicationId appId : appContexts.keySet()) {
+ LOG.debug("Dumping appContexts, appId=" + appId.toString());
+ }
+ }
+ return appContexts.containsKey(applicationId);
+ }
+
+ public synchronized AMContext getAMContext(
+ ApplicationAttemptId applicationAttemptId) {
+ LOG.info("Received a getSubmissionContext request from"
+ + " applicationAttemptId=" + applicationAttemptId.toString());
+ if (appContexts.containsKey(applicationAttemptId.getApplicationId())) {
+ AMContext amContext = appContexts.get(
+ applicationAttemptId.getApplicationId());
+ LOG.info("Received a getSubmissionContext request from"
+ + " applicationAttemptId=" + applicationAttemptId.toString()
+ + ", assigning job");
+ amContext.setCurrentApplicationAttemptId(applicationAttemptId);
+ if (amContext.getSubmissionContext() != null) {
+ amContext.setJobPickUpTime(System.currentTimeMillis());
+ }
+ return amContext;
+ }
+ boolean found = false;
+ for (AMContext amContext : unassignedAppContexts) {
+ if (amContext.getApplicationId().equals(
+ applicationAttemptId.getApplicationId())) {
+ found = true;
+ }
+ }
+ if (!found) {
+ throw new RuntimeException("Could not find application id");
+ }
+ LOG.info("Received a getSubmissionContext request from"
+ + " applicationAttemptId=" + applicationAttemptId.toString()
+ + ", no job to assign");
+ return null;
+ }
+
+ public int getPendingLaunches() {
+ return pendingLaunches.get();
+ }
+
+ public int getFailedLaunches() {
+ return failedLaunches.get();
+ }
+
+ public synchronized List<AMContext> getUnassignedApplications() {
+ return Collections.unmodifiableList(unassignedAppContexts);
+ }
+
+ public synchronized Map<ApplicationId, AMContext> getApplicationsDump() {
+ return Collections.unmodifiableMap(appContexts);
+ }
+
+ private synchronized void removeAppId(ApplicationId appId) {
+ appContexts.remove(appId);
+ for (int i = 0; i < unassignedAppContexts.size(); ++i) {
+ AMContext amContext = unassignedAppContexts.get(i);
+ if (amContext.getApplicationId().equals(appId)) {
+ unassignedAppContexts.remove(i);
+ break;
+ }
+ }
+ }
+
+ public synchronized AMContext getAMContext(ApplicationId applicationId) {
+ return appContexts.get(applicationId);
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.mr;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyAMConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.ampool.AMLauncher;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.ampool.manager.AMLaunchedEvent;
+
+public class MRAMLauncher extends AbstractService
+ implements AMLauncher {
+
+ private static final Log LOG = LogFactory.getLog(MRAMLauncher.class);
+
+ private final FileContext defaultFileContext;
+
+ private final Dispatcher dispatcher;
+
+ private YarnClient yarnClient;
+
+ private ExecutorService executorService;
+
+ private Configuration conf;
+
+ private final String pollingUrl;
+
+ private String lazyAMConfigPathStr;
+
+ private final boolean inCLIMode;
+
+ private final String tmpAppDirPath;
+
+ private final String user;
+
+ private Path lazyAmConfPathOnDfs;
+
+ private FileSystem fs = null;
+
+ public MRAMLauncher(Configuration conf, Dispatcher dispatcher,
+ YarnClient yarnClient, String pollingUrl, String lazyAMConfigPath,
+ boolean inCLIMode, String tmpAppDirPath)
+ throws Exception {
+ super(MRAMLauncher.class.getName());
+ // TODO Auto-generated constructor stub
+ this.defaultFileContext = FileContext.getFileContext(conf);
+ this.dispatcher = dispatcher;
+ this.yarnClient = yarnClient;
+ this.pollingUrl = pollingUrl;
+ this.lazyAMConfigPathStr = lazyAMConfigPath;
+ this.inCLIMode = inCLIMode;
+ this.tmpAppDirPath = tmpAppDirPath;
+ this.user = UserGroupInformation.getCurrentUser().getShortUserName();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ executorService = Executors.newCachedThreadPool();
+ this.conf = conf;
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ if (!inCLIMode) {
+ return;
+ }
+ Configuration lazyAMConfToUpload = new LazyAMConfig();
+ File lazyAmConfFile = new File(tmpAppDirPath,
+ LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+ LOG.info("Writing contents of LazyAMConfig to local fs"
+ + ", path=" + lazyAmConfFile.getAbsolutePath());
+ lazyAmConfFile.getParentFile().mkdirs();
+ try {
+ lazyAMConfToUpload.writeXml(new FileOutputStream(lazyAmConfFile));
+
+ String stagingDir = conf.get(AMPoolConfiguration.AM_STAGING_DIR,
+ AMPoolConfiguration.DEFAULT_AM_STAGING_DIR)
+ + Path.SEPARATOR + user + Path.SEPARATOR;
+
+ lazyAmConfPathOnDfs = new Path(stagingDir,
+ LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+ this.lazyAMConfigPathStr = lazyAmConfPathOnDfs.toString();
+
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Uploading contents of LazyAMConfig to dfs"
+ + ", srcpath=" + lazyAmConfFile.getAbsolutePath()
+ + ", destpath=" + lazyAmConfPathOnDfs.toString());
+ fs.copyFromLocalFile(false, true,
+ new Path(lazyAmConfFile.getAbsolutePath()), lazyAmConfPathOnDfs);
+ fs.getFileStatus(lazyAmConfPathOnDfs);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to setup lazy am config in DFS", e);
+ }
+ }
+
+ @Override public void stop() {
+ executorService.shutdownNow();
+ if (inCLIMode && fs != null
+ && lazyAmConfPathOnDfs != null) {
+ try {
+ fs.delete(lazyAmConfPathOnDfs, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void launchAM() {
+ LOG.info("Launching new AM");
+ executorService.execute(
+ new MRAMLauncherThread(dispatcher, yarnClient, conf));
+ }
+
+ private class MRAMLauncherThread implements Runnable {
+
+ private final Dispatcher dispatcher;
+ private final YarnClient yarnClient;
+ private final Configuration conf;
+
+ public MRAMLauncherThread(Dispatcher dispatcher,
+ YarnClient yarnClient,
+ Configuration conf) {
+ this.dispatcher = dispatcher;
+ this.yarnClient = yarnClient;
+ this.conf = conf;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ ApplicationId newAppId = null;
+ try {
+ newAppId = yarnClient.getNewApplication().getApplicationId();
+ ApplicationSubmissionContext submissionContext =
+ createApplicationSubmissionContext(newAppId, conf);
+ yarnClient.submitApplication(submissionContext);
+ LOG.info("Launched new MR AM"
+ + ", applicationId=" + newAppId);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ if (newAppId != null) {
+ AMLaunchedEvent event = new AMLaunchedEvent(newAppId);
+ this.dispatcher.getEventHandler().handle(event);
+ }
+ }
+ }
+
+ private LocalResource createApplicationResource(FileContext fs,
+ Path p, LocalResourceType type) throws IOException {
+ LocalResource rsrc = Records.newRecord(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ public ApplicationSubmissionContext createApplicationSubmissionContext(
+ ApplicationId applicationId,
+ Configuration conf) throws IOException {
+
+ // Setup resource requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(
+ conf.getInt(
+ AMPoolConfiguration.MR_AM_MEMORY_ALLOCATION_MB,
+ AMPoolConfiguration.DEFAULT_MR_AM_MEMORY_ALLOCATION_MB
+ )
+ );
+ // TODO for now use default of 1 for virtual cores
+ capability.setVirtualCores(
+ conf.getInt(
+ MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+ )
+ );
+ LOG.info("LazyMRMAM capability = " + capability);
+
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ if (lazyAMConfigPathStr == null
+ || lazyAMConfigPathStr.isEmpty()) {
+ LOG.error("Invalid path to lazy am config provided");
+ // TODO throw error?
+ } else {
+ LOG.info("Using lazyAMConf as a local resource for LazyAM"
+ + ", confPath=" + lazyAMConfigPathStr);
+ Path lazyAMConfPath = new Path(lazyAMConfigPathStr);
+ localResources.put(LazyAMConfig.LAZY_AM_JOB_XML_FILE,
+ createApplicationResource(defaultFileContext, lazyAMConfPath,
+ LocalResourceType.FILE));
+ }
+
+ String[] jarPaths = conf.getStrings(AMPoolConfiguration.MR_AM_JOB_JAR_PATH,
+ AMPoolConfiguration.DEFAULT_MR_AM_JOB_JAR_PATH);
+ if (jarPaths.length == 1) {
+ String jarPath = jarPaths[0];
+ if (jarPath != null) {
+ jarPath = jarPath.trim();
+ if (!jarPath.isEmpty()) {
+ Path jobJarPath = new Path(jarPath);
+ LocalResource rc = createApplicationResource(defaultFileContext,
+ jobJarPath, LocalResourceType.FILE);
+ localResources.put(MRJobConfig.JOB_JAR, rc);
+ }
+ }
+ } else {
+ for (String jarPath : jarPaths) {
+ jarPath = jarPath.trim();
+ if (jarPath == null || jarPath.isEmpty()) {
+ continue;
+ }
+ Path jobJarPath = new Path(jarPath);
+ String jarName = jobJarPath.getName();
+ LocalResource rc = createApplicationResource(defaultFileContext,
+ jobJarPath, LocalResourceType.FILE);
+ localResources.put(jarName, rc);
+ }
+ }
+
+ // Setup the command to run the AM
+ List<String> vargs = new ArrayList<String>(8);
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+ // TODO: why do we use 'conf' some places and 'jobConf' others?
+ long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
+ String logLevel = conf.get(
+ MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
+ MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+
+ // Add AM admin command opts before user command opts
+ // so that it can be overridden by user
+ vargs.add(conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS));
+
+ vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
+
+ vargs.add(conf.get(AMPoolConfiguration.APPLICATION_MASTER_CLASS,
+ AMPoolConfiguration.DEFAULT_APPLICATION_MASTER_CLASS));
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDERR);
+
+
+ Vector<String> vargsFinal = new Vector<String>(8);
+ // Final command
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ vargsFinal.add(mergedCommand.toString());
+
+ LOG.debug("Command to launch container for ApplicationMaster is : "
+ + mergedCommand);
+
+ // Setup the CLASSPATH in environment
+ // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+ MRApps.setClasspath(environment, conf);
+
+ // Setup the environment variables (LD_LIBRARY_PATH, etc)
+ MRApps.setEnvFromInputString(environment,
+ conf.get(MRJobConfig.MR_AM_ENV));
+
+ environment.put(AMPoolConfiguration.LAZY_AM_POLLING_URL_ENV, pollingUrl);
+
+ // TODO distributed cache ?
+ // TODO acls ?
+
+ // Setup ContainerLaunchContext for AM container
+ ContainerLaunchContext amContainer = BuilderUtils
+ .newContainerLaunchContext(null, UserGroupInformation
+ .getCurrentUser().getShortUserName(), capability, localResources,
+ environment, vargsFinal, null, null, null);
+
+ // Set up the ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(applicationId);
+ appContext.setUser(
+ UserGroupInformation.getCurrentUser().getShortUserName());
+
+ appContext.setQueue(
+ conf.get(AMPoolConfiguration.MR_AM_QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ appContext.setApplicationName("MRAMLaunchedbyAMPoolService");
+ appContext.setCancelTokensWhenComplete(
+ conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
+ appContext.setAMContainerSpec(amContainer);
+
+ return appContext;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.rest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+public class AMPoolStatus {
+
+ @JsonProperty("hostname")
+ private String hostname;
+
+ @JsonProperty("pendingLaunches")
+ private int pendingLaunches;
+
+ @JsonProperty("failedLaunches")
+ private int failedLaunches;
+
+ @JsonProperty("applications")
+ private Map<String, String> applications;
+
+ @JsonProperty("unassignedApplications")
+ private List<String> unassignedApplications;
+
+ public AMPoolStatus() {
+ this.applications = new TreeMap<String, String>();
+ this.setUnassignedApplications(new ArrayList<String>());
+ }
+
+ /**
+ * @return the applications
+ */
+ public Map<String, String> getApplications() {
+ return applications;
+ }
+
+ /**
+ * @param applications the applications to set
+ */
+ public void setApplications(Map<String, String> applications) {
+ this.applications = applications;
+ }
+
+ /**
+ * @return the pendingLaunches
+ */
+ public int getPendingLaunches() {
+ return pendingLaunches;
+ }
+
+ /**
+ * @param pendingLaunches the pendingLaunches to set
+ */
+ public void setPendingLaunches(int pendingLaunches) {
+ this.pendingLaunches = pendingLaunches;
+ }
+
+ /**
+ * @return the failedLaunches
+ */
+ public int getFailedLaunches() {
+ return failedLaunches;
+ }
+
+ /**
+ * @param failedLaunches the failedLaunches to set
+ */
+ public void setFailedLaunches(int failedLaunches) {
+ this.failedLaunches = failedLaunches;
+ }
+
+ /**
+ * @return the unassignedApplications
+ */
+ public List<String> getUnassignedApplications() {
+ return unassignedApplications;
+ }
+
+ /**
+ * @param unassignedApplications the unassignedApplications to set
+ */
+ public void setUnassignedApplications(List<String> unassignedApplications) {
+ this.unassignedApplications = unassignedApplications;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.ampool.rest;
+
+import java.util.Map.Entry;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.ampool.AMContext;
+import org.apache.tez.ampool.manager.AMPoolManager;
+
+@Path("/master/")
+public class AMPoolStatusService {
+
+ private static AMPoolManager manager;
+
+ public static void init(AMPoolManager manager) {
+ AMPoolStatusService.manager = manager;
+ }
+
+ @Path("status")
+ @GET
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces({MediaType.APPLICATION_JSON})
+ public AMPoolStatus getStatus(
+ @Context HttpServletRequest req) {
+ AMPoolStatus status = new AMPoolStatus();
+ status.setHostname(req.getLocalName());
+ status.setPendingLaunches(manager.getPendingLaunches());
+ status.setFailedLaunches(manager.getFailedLaunches());
+ for (AMContext amContext : manager.getUnassignedApplications()) {
+ status.getUnassignedApplications().add(
+ amContext.getApplicationId().toString());
+ }
+ for (Entry<ApplicationId, AMContext> entry :
+ manager.getApplicationsDump().entrySet()) {
+ String attemptIdStr = null;
+ if (entry.getValue().getCurrentApplicationAttemptId() != null) {
+ attemptIdStr =
+ entry.getValue().getCurrentApplicationAttemptId().toString();
+ }
+ status.getApplications().put(entry.getKey().toString(), attemptIdStr);
+ }
+ return status;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.ampool.rest;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class ApplicationPollResponse {
+
+ private String applicationIdStr;
+
+ private String configurationFileLocation;
+
+ private String applicationJarLocation;
+
+ private long applicationSubmissionTime;
+
+ public ApplicationPollResponse() {
+ this.applicationIdStr = null;
+ }
+
+ public ApplicationPollResponse(ApplicationId applicationId,
+ long applicationSubmissionTime) {
+ this.setApplicationIdStr(ConverterUtils.toString(applicationId));
+ this.applicationSubmissionTime = applicationSubmissionTime;
+ }
+
+ public String getApplicationIdStr() {
+ return applicationIdStr;
+ }
+
+ public void setApplicationIdStr(String applicationIdStr) {
+ this.applicationIdStr = applicationIdStr;
+ }
+
+ public String getApplicationJarLocation() {
+ return applicationJarLocation;
+ }
+
+ public void setApplicationJarLocation(String applicationJarLocation) {
+ this.applicationJarLocation = applicationJarLocation;
+ }
+
+ public String getConfigurationFileLocation() {
+ return configurationFileLocation;
+ }
+
+ public void setConfigurationFileLocation(String configurationFileLocation) {
+ this.configurationFileLocation = configurationFileLocation;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("applicationId=" + applicationIdStr
+ + ", configurationFileLocation=" + configurationFileLocation
+ + ", applicationJarLocation=" + applicationJarLocation
+ + ", applicationSubmissionTime=" + applicationSubmissionTime);
+ return sb.toString();
+ }
+
+ public long getApplicationSubmissionTime() {
+ return applicationSubmissionTime;
+ }
+
+ public void setApplicationSubmissionTime(long applicationSubmissionTime) {
+ this.applicationSubmissionTime = applicationSubmissionTime;
+ }
+
+}