You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [3/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolConfiguration.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+public class AMPoolConfiguration extends Configuration {
+
+  private static final String AMPOOL_DEFAULT_XML_FILE =
+      "tez-ampool-default.xml";
+  private static final String AMPOOL_SITE_XML_FILE = "tez-ampool-site.xml";
+  public static final String AMPOOL_APP_XML_FILE = "tez-ampool-app.xml";
+
+  public static final String APP_NAME = "AMPoolService";
+
+  static {
+    Configuration.addDefaultResource(AMPOOL_DEFAULT_XML_FILE);
+    Configuration.addDefaultResource(AMPOOL_SITE_XML_FILE);
+    Configuration.addDefaultResource(AMPOOL_APP_XML_FILE);
+  }
+
+  //Configurations
+
+  public static final String AMPOOL_PREFIX = "tez.ampool.";
+
+  // Client
+
+  public static final String AM_MASTER_MEMORY_MB =
+      AMPOOL_PREFIX + "am.master_memory";
+  public static final int DEFAULT_AM_MASTER_MEMORY_MB = 1024;
+
+  public static final String AM_MASTER_QUEUE =
+      AMPOOL_PREFIX + "am.master_queue";
+  public static final String DEFAULT_AM_MASTER_QUEUE = "default";
+
+  // Server
+  public static final String WS_PORT =
+      AMPOOL_PREFIX + "ws.port";
+  public static final int DEFAULT_WS_PORT = 12999;
+
+  // AM
+
+  public static final String AM_POOL_SIZE =
+      AMPOOL_PREFIX + "am-pool-size";
+  public static final int DEFAULT_AM_POOL_SIZE = 3;
+
+  public static final String MAX_AM_POOL_SIZE =
+      AMPOOL_PREFIX + "max-am-pool-size";
+  public static final int DEFAULT_MAX_AM_POOL_SIZE = 5;
+
+  public static final String AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION =
+      AMPOOL_PREFIX + "launch-new-am-after-app-completion";
+  public static final boolean DEFAULT_AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION =
+      true;
+
+  public static final String MAX_AM_LAUNCH_FAILURES =
+      AMPOOL_PREFIX + "max-am-launch-failures";
+  public static final int DEFAULT_MAX_AM_LAUNCH_FAILURES = 10;
+
+  public static final String AM_STAGING_DIR =
+      AMPOOL_PREFIX + "am.staging-dir";
+  public static final String DEFAULT_AM_STAGING_DIR =
+      "/tmp/tez/ampool/staging/";
+
+  // RM Client proxy
+
+  public static final String RM_PROXY_CLIENT_THREAD_COUNT =
+      AMPOOL_PREFIX + "rm-proxy-client.thread-count";
+  public static final int DEFAULT_RM_PROXY_CLIENT_THREAD_COUNT = 10;
+
+  public static final String RM_PROXY_CLIENT_ADDRESS =
+      AMPOOL_PREFIX + "address";
+  public static final int DEFAULT_RM_PROXY_CLIENT_PORT = 10030;
+  public static final String DEFAULT_RM_PROXY_CLIENT_ADDRESS = "0.0.0.0:" +
+      DEFAULT_RM_PROXY_CLIENT_PORT;
+
+  // MR AM related
+
+  // Memory to allocate for lazy AM
+  public static final String MR_AM_MEMORY_ALLOCATION_MB =
+      AMPOOL_PREFIX + "mr-am.memory-allocation-mb";
+  public static final int DEFAULT_MR_AM_MEMORY_ALLOCATION_MB = 1536;
+
+  // Queue to launch LazyMRAM
+  public static final String MR_AM_QUEUE_NAME =
+      AMPOOL_PREFIX + "mr-am.queue-name";
+
+  public static final String MR_AM_JOB_JAR_PATH =
+      AMPOOL_PREFIX + "mr-am.job-jar-path";
+  public static final String DEFAULT_MR_AM_JOB_JAR_PATH =
+      "";
+
+  public static final String APPLICATION_MASTER_CLASS =
+      AMPOOL_PREFIX + "mr-am.application-master-class";
+  public static final String DEFAULT_APPLICATION_MASTER_CLASS =
+      "org.apache.hadoop.mapreduce.v2.app2.lazy.LazyMRAppMaster";
+
+  public static final String LAZY_AM_POLLING_URL_ENV =
+      "LAZY_AM_POLLING_URL";
+
+  public static final String TMP_DIR_PATH =
+      AMPOOL_PREFIX + "tmp-dir-path";
+  public static final String DEFAULT_TMP_DIR_PATH = "/tmp/ampoolservice/";
+
+  public static final String LAZY_AM_CONF_FILE_PATH =
+      AMPOOL_PREFIX + "lazy-am-conf-file-path";
+
+  /**
+   * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
+   * entries
+   */
+  public static final String AMPOOL_APPLICATION_CLASSPATH = AMPOOL_PREFIX
+      + "application.classpath";
+
+  /**
+   * Default CLASSPATH for YARN applications. A comma-separated list of
+   * CLASSPATH entries
+   */
+  public static final String[] DEFAULT_AMPOOL_APPLICATION_CLASSPATH = {
+      ApplicationConstants.Environment.HADOOP_CONF_DIR.$(),
+      ApplicationConstants.Environment.HADOOP_COMMON_HOME.$()
+          + "/share/hadoop/common/*",
+      ApplicationConstants.Environment.HADOOP_COMMON_HOME.$()
+          + "/share/hadoop/common/lib/*",
+      ApplicationConstants.Environment.HADOOP_HDFS_HOME.$()
+          + "/share/hadoop/hdfs/*",
+      ApplicationConstants.Environment.HADOOP_HDFS_HOME.$()
+          + "/share/hadoop/hdfs/lib/*",
+      ApplicationConstants.Environment.HADOOP_YARN_HOME.$()
+          + "/share/hadoop/yarn/*",
+      ApplicationConstants.Environment.HADOOP_YARN_HOME.$()
+          + "/share/hadoop/yarn/lib/*"
+  };
+
+  public AMPoolConfiguration() {
+    super();
+  }
+
+  public AMPoolConfiguration(Configuration conf) {
+    super(conf);
+    if (! (conf instanceof AMPoolConfiguration)) {
+      this.reloadConfiguration();
+    }
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.ampool.manager.AMPoolManager;
+
+public class AMPoolContext {
+
+  private final Dispatcher dispatcher;
+
+  private final YarnClient rmYarnClient;
+
+  private final AMRMClient amRMClient;
+
+  private final AMPoolManager amPoolManager;
+
+  private Resource minResourceCapability;
+  private Resource maxResourceCapability;
+
+  private final String nmHost;
+
+  public AMPoolContext(Configuration conf, String nmHost,
+      Dispatcher dispatcher, YarnClient yarnClient, AMRMClient amRMClient,
+      AMPoolManager amPoolManager) {
+    // TODO
+    this.dispatcher = dispatcher;
+    this.rmYarnClient = yarnClient;
+    this.amRMClient = amRMClient;
+    this.amPoolManager = amPoolManager;
+    this.nmHost = nmHost;
+  }
+
+  public ApplicationId getNewApplicationId() {
+    return amPoolManager.getAM();
+  }
+
+  public YarnClient getRMYarnClient() {
+    return rmYarnClient;
+  }
+
+  public AMContext getAMContext(ApplicationId applicationId) {
+    return amPoolManager.getAMContext(applicationId);
+  }
+
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) {
+    amPoolManager.submitApplication(request);
+    SubmitApplicationResponse response =
+        Records.newRecord(SubmitApplicationResponse.class);
+    return response;
+  }
+
+  public AMRMClient getAMRMClient() {
+    return amRMClient;
+  }
+
+  public synchronized Resource getMaxResourceCapability() {
+    return maxResourceCapability;
+  }
+
+  public synchronized Resource getMinResourceCapability() {
+    return minResourceCapability;
+  }
+
+  public synchronized void setMaxResourceCapability(Resource resource) {
+    maxResourceCapability = resource;
+  }
+
+  public synchronized void setMinResourceCapability(Resource resource) {
+    minResourceCapability = resource;
+  }
+
+  public Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  public boolean isManagedApp(ApplicationId applicationId) {
+    return amPoolManager.isManagedApp(applicationId);
+  }
+
+  public String getNMHost() {
+    return nmHost;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.ampool.manager.AMPoolEventType;
+import org.apache.tez.ampool.manager.AMPoolManager;
+import org.apache.tez.ampool.rest.AMPoolStatusService;
+import org.apache.tez.ampool.rest.ApplicationPollService;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+
+import com.sun.jersey.spi.container.servlet.ServletContainer;
+
+public class AMPoolService extends CompositeService {
+
+  private static final Log LOG = LogFactory.getLog(AMPoolService.class);
+
+  private final long startTime = System.currentTimeMillis();
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  private AMPoolContext context;
+  private AMPoolClientRMProxy clientRMProxy;
+  private AMPoolManager amPoolManager;
+  private RMHeartbeatService rmHeartbeatService;
+  private AMMonitorService amMonitorService;
+  private Server webAppServer = null;
+  private Configuration conf;
+
+  private YarnClient yarnRMClient;
+  private AMRMClient amRMClient;
+
+  private Dispatcher dispatcher;
+
+  private final ApplicationAttemptId applicationAttemptId;
+  private final ContainerId containerId;
+  private final String nmHost;
+  private final int nmPort;
+  private int webAppServerPort;
+  private String trackerUrl;
+
+  private final boolean inCLIMode;
+
+  public String tmpAppDirPath = null;
+
+  private FileSystem localFs;
+
+  private static final Options opts;
+
+  static {
+    opts = new Options();
+    opts.addOption("cli", false, "Run via command-line in non-AM mode");
+  }
+
+  public AMPoolService(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String nmHost, int nmPort,
+      boolean inCLIMode) {
+    super(AMPoolService.class.getName());
+    this.applicationAttemptId = applicationAttemptId;
+    this.containerId = containerId;
+    this.nmHost = nmHost;
+    this.nmPort = nmPort;
+    this.inCLIMode = inCLIMode;
+  }
+
+  public AMPoolService(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String nmHost, int nmPort) {
+    this(applicationAttemptId, containerId, nmHost, nmPort, false);
+    LOG.info("Created AMPoolService"
+        + ", applicationAttemptId=" + applicationAttemptId
+        + ", containerId=" + containerId
+        + ", nmHost=" + nmHost
+        + ", nmPort=" + nmPort
+        + ", launchTime=" + startTime
+        + ", inCLIMode=" + inCLIMode);
+  }
+
+  public AMPoolService(String localhost) {
+    this(null, null, localhost, -1, true);
+    LOG.info("Created AMPoolService"
+        + ", host=" + nmHost
+        + ", launchTime=" + startTime
+        + ", inCLIMode=" + inCLIMode);
+  }
+
+  @Override
+  public void init(final Configuration conf) {
+    this.conf = conf;
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    dispatcher = new AsyncDispatcher();
+    addIfService(dispatcher);
+
+    yarnRMClient = new YarnClientImpl();
+    addIfService(yarnRMClient);
+
+    if (inCLIMode) {
+      amRMClient = null;
+    } else {
+      amRMClient = new AMRMClientImpl(applicationAttemptId);
+      addIfService(amRMClient);
+    }
+
+    tmpAppDirPath   = conf.get(AMPoolConfiguration.TMP_DIR_PATH,
+        AMPoolConfiguration.DEFAULT_TMP_DIR_PATH)
+      + File.separator;
+
+    amPoolManager = new AMPoolManager(conf, dispatcher, yarnRMClient, nmHost,
+        inCLIMode, tmpAppDirPath);
+    dispatcher.register(AMPoolEventType.class, amPoolManager);
+    addIfService(amPoolManager);
+
+    context = new AMPoolContext(conf, nmHost, dispatcher,
+        yarnRMClient, amRMClient, amPoolManager);
+
+    amMonitorService = new AMMonitorService(context);
+    dispatcher.register(AMMonitorEventType.class, amMonitorService);
+    addIfService(amMonitorService);
+
+    clientRMProxy = new AMPoolClientRMProxy(context);
+    addIfService(clientRMProxy);
+
+    webAppServerPort = conf.getInt(AMPoolConfiguration.WS_PORT,
+        AMPoolConfiguration.DEFAULT_WS_PORT);
+    trackerUrl = "http://" + nmHost
+        + ":" + webAppServerPort
+        + "/master/status";
+
+    if (!inCLIMode) {
+      rmHeartbeatService = new RMHeartbeatService(context, trackerUrl,
+          webAppServerPort);
+      addIfService(rmHeartbeatService);
+    }
+
+    webAppServer = new Server(webAppServerPort);
+    webAppServer.setThreadPool(new QueuedThreadPool(50));
+    webAppServer.setStopAtShutdown(true);
+
+    ServletContextHandler root = new ServletContextHandler(webAppServer, "/");
+    ServletHolder rootServlet = root.addServlet(DefaultServlet.class, "/");
+    rootServlet.setInitOrder(1);
+
+    ServletHolder restHandler = new ServletHolder(ServletContainer.class);
+    restHandler.setInitParameter(
+        "com.sun.jersey.config.property.resourceConfigClass",
+        "com.sun.jersey.api.core.PackagesResourceConfig");
+    restHandler.setInitParameter(
+        "com.sun.jersey.config.property.packages",
+        "org.apache.tez.ampool.rest");
+    restHandler.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
+        "true");
+    root.addServlet(restHandler, "/*");
+    restHandler.setInitOrder(2);
+
+    ApplicationPollService.init(amPoolManager);
+    AMPoolStatusService.init(amPoolManager);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    if (!inCLIMode) {
+      LOG.debug("Starting AMPoolService"
+          + ", applicationAttemptId=" + applicationAttemptId
+          + ", containerId=" + containerId
+          + ", nmHost=" + nmHost
+          + ", nmPort=" + nmPort
+          + ", launchTime=" + startTime
+          + ", startingTime=" + System.currentTimeMillis()
+          + ", inCLIMode=" + inCLIMode);
+    }
+    super.start();
+    try {
+      webAppServer.start();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (!inCLIMode) {
+      LOG.info("Started AMPoolService"
+          + ", applicationAttemptId=" + applicationAttemptId
+          + ", containerId=" + containerId
+          + ", nmHost=" + nmHost
+          + ", nmPort=" + nmPort
+          + ", launchTime=" + startTime
+          + ", startedTime=" + System.currentTimeMillis()
+          + ", inCLIMode=" + inCLIMode);
+    } else {
+      LOG.info("Started AMPoolService. Use " + trackerUrl
+          + " to monitor the AMPoolService");
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (webAppServer != null) {
+      try {
+        webAppServer.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    super.stop();
+  }
+
+  protected void addIfService(Object object) {
+    if (object instanceof Service) {
+      addService((Service) object);
+    }
+  }
+
+  private static void validateInputParam(String value, String param)
+      throws IOException {
+    if (value == null) {
+      String msg = param + " is null";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+  }
+
+  static class AMPoolServiceShutdownHook implements Runnable {
+    AMPoolService appMaster;
+    AMPoolServiceShutdownHook(AMPoolService appMaster) {
+      this.appMaster = appMaster;
+    }
+    public void run() {
+      LOG.info("AMPoolService received a signal. Shutting down now.");
+      Configuration conf = appMaster.conf;
+      appMaster.stop();
+      // TODO fix - this is a problem with multiple attempts of this AM
+      // Client does not remain running till end of AM
+      FileSystem fs;
+      try {
+        if (!appMaster.inCLIMode) {
+          fs = FileSystem.get(conf);
+          Path appDirOnDFS = new Path(
+              fs.getHomeDirectory(),
+              AMPoolConfiguration.APP_NAME + "/"
+              + appMaster.applicationAttemptId.getApplicationId().toString()
+              + "/");
+          LOG.info("Deleting app dir on dfs"
+              + ", path=" + appDirOnDFS.toString());
+          fs.delete(appDirOnDFS, true);
+        } else {
+          if (appMaster.tmpAppDirPath != null) {
+            LOG.info("Deleting working dir: " + appMaster.tmpAppDirPath);
+            appMaster.localFs.delete(new Path(appMaster.tmpAppDirPath), true);
+          }
+        }
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+
+    }
+  }
+
+  private static AMPoolService initNonCLIAM(Configuration conf)
+      throws IOException {
+    String containerIdStr =
+        System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+    String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+    String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+
+    validateInputParam(containerIdStr,
+        ApplicationConstants.AM_CONTAINER_ID_ENV);
+    validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+    validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    ApplicationAttemptId applicationAttemptId =
+          containerId.getApplicationAttemptId();
+
+    AMPoolService appMaster =
+          new AMPoolService(applicationAttemptId, containerId, nodeHostString,
+              Integer.parseInt(nodePortString));
+    return appMaster;
+  }
+
+  public static void main(String[] args) throws IOException, ParseException {
+
+    Configuration conf = new AMPoolConfiguration(new YarnConfiguration());
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+    AMPoolService appMaster = null;
+
+    String localhost = InetAddress.getLocalHost().getCanonicalHostName();
+
+    if (!cliParser.hasOption("cli")) {
+      appMaster = initNonCLIAM(conf);
+    } else {
+      appMaster = new AMPoolService(localhost);
+    }
+
+    ShutdownHookManager.get().addShutdownHook(
+        new AMPoolServiceShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
+
+    appMaster.init(conf);
+    appMaster.start();
+
+  }
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/RMHeartbeatService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.AMRMClient;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class RMHeartbeatService extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(RMHeartbeatService.class);
+
+  private long heartbeatInterval = 1000l;
+  Timer scheduleTimer;
+  RMAllocateTimerTask rmAllocateTimerTask;
+  private final AMRMClient amRmClient;
+  private final AMPoolContext context;
+  private final String trackerUrl;
+  private final int trackerPort;
+
+  public RMHeartbeatService(AMPoolContext context, String trackerUrl,
+      int trackerPort) {
+    super(RMHeartbeatService.class.getName());
+    // TODO Auto-generated constructor stub
+    this.context = context;
+    this.amRmClient = context.getAMRMClient();
+    this.trackerUrl = trackerUrl;
+    this.trackerPort = trackerPort;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    try {
+      LOG.info("Registering AMPoolService with RM"
+          + ", nmHost=" + context.getNMHost()
+          + ", trackerPort=" + trackerPort
+          + ", trackerUrl=" + trackerUrl);
+      RegisterApplicationMasterResponse response =
+          amRmClient.registerApplicationMaster(
+              context.getNMHost(), trackerPort, trackerUrl);
+      context.setMinResourceCapability(response.getMinimumResourceCapability());
+      context.setMaxResourceCapability(response.getMaximumResourceCapability());
+    } catch (YarnRemoteException e) {
+      throw new RuntimeException(e);
+    }
+
+    scheduleTimer = new Timer("RMAllocateTimer", true);
+    rmAllocateTimerTask = new RMAllocateTimerTask();
+    scheduleTimer.scheduleAtFixedRate(rmAllocateTimerTask, heartbeatInterval,
+        heartbeatInterval);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if (rmAllocateTimerTask != null) {
+      rmAllocateTimerTask.stop();
+    }
+  }
+
+  private class RMAllocateTimerTask extends TimerTask {
+    private volatile boolean shouldRun = true;
+
+    @Override
+    public void run() {
+      if (shouldRun) {
+        try {
+          amRmClient.allocate(0);
+        } catch (YarnRemoteException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    }
+
+    public void stop() {
+      shouldRun = false;
+      this.cancel();
+    }
+  }
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/client/AMPoolClient.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.client;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyAMConfig;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.ampool.AMPoolService;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class AMPoolClient extends YarnClientImpl {
+
+  private static final Log LOG = LogFactory.getLog(AMPoolClient.class);
+
+  // Configuration
+  private Configuration conf;
+
+  private Options opts;
+
+  private String appMasterJar;
+
+  private final String appName = AMPoolConfiguration.APP_NAME;
+  private static final String appMasterMainClass = AMPoolService.class.getName();
+
+  private String tmpAppDirPath = null;
+
+  public AMPoolClient() {
+    opts = new Options();
+    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
+    opts.addOption("jar", true, "Jar file containing the application master");
+    opts.addOption("help", false, "Print usage");
+  }
+
+  /**
+   * @param args Command line arguments
+   */
+  public static void main(String[] args) {
+    boolean result = false;
+    AMPoolClient amPoolClient = null;
+    try {
+      amPoolClient = new AMPoolClient();
+      LOG.info("Initializing AMPoolClient");
+      try {
+        boolean doRun = amPoolClient.init(args);
+        if (!doRun) {
+          System.exit(0);
+        }
+      } catch (IllegalArgumentException e) {
+        System.err.println(e.getLocalizedMessage());
+        amPoolClient.printUsage();
+        System.exit(-1);
+      }
+      result = amPoolClient.run();
+    } catch (Throwable t) {
+      LOG.fatal("Error running CLient", t);
+      System.exit(1);
+    } finally {
+      if (amPoolClient != null
+          && amPoolClient.tmpAppDirPath != null) {
+        LOG.info("Deleting working dir: " + amPoolClient.tmpAppDirPath);
+        File dir = new File(amPoolClient.tmpAppDirPath);
+        for (File f : dir.listFiles()) {
+          f.delete();
+        }
+        dir.delete();
+      }
+    }
+    if (result) {
+      LOG.info("Application completed successfully");
+      System.exit(0);
+    }
+    LOG.error("Application failed to complete successfully");
+    System.exit(2);
+  }
+
+  public boolean init(String[] args) throws Exception {
+    this.conf = new AMPoolConfiguration(new YarnConfiguration());
+    // TODO Auto-generated method stub
+    super.init(conf);
+
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+
+    if (args.length == 0) {
+      throw new IllegalArgumentException("No args specified");
+    }
+
+    if (cliParser.hasOption("help")) {
+      printUsage();
+      return false;
+    }
+
+    if (!cliParser.hasOption("jar")) {
+      throw new IllegalArgumentException("No jar file specified for application master");
+    }
+
+
+    if (!cliParser.hasOption("jar")) {
+      throw new IllegalArgumentException(
+          "No jar file specified for application master");
+    }
+
+    appMasterJar = cliParser.getOptionValue("jar");
+
+    return true;
+  }
+
+  private void printUsage() {
+    // TODO Auto-generated method stub
+  }
+
+  public boolean run() throws IOException {
+    LOG.info("Running AMPoolClient");
+    start();
+
+    // Get a new application id
+    GetNewApplicationResponse newApp = super.getNewApplication();
+    ApplicationId appId = newApp.getApplicationId();
+    System.out.println("Starting AMPoolMaster with Application ID: "
+        + appId);
+
+    int minMem = newApp.getMinimumResourceCapability().getMemory();
+    int maxMem = newApp.getMaximumResourceCapability().getMemory();
+    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    int amMemory = conf.getInt(AMPoolConfiguration.AM_MASTER_MEMORY_MB,
+        AMPoolConfiguration.DEFAULT_AM_MASTER_MEMORY_MB);
+
+    // Create launch context for app master
+    LOG.info("Setting up application submission context for ASM");
+    ApplicationSubmissionContext appContext =
+        Records.newRecord(ApplicationSubmissionContext.class);
+
+    appContext.setApplicationId(appId);
+    appContext.setApplicationName(appName);
+
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records.newRecord(
+        ContainerLaunchContext.class);
+
+    // set local resources for the application master
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+
+    LOG.info("Copy App Master jar from local filesystem"
+        + " and add to local environment");
+    FileSystem fs = FileSystem.get(conf);
+    Path src = new Path(appMasterJar);
+    String pathSuffix = appName + "/" + appId.toString() + "/AMPoolService.jar";
+    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+    fs.copyFromLocalFile(false, true, src, dst);
+    FileStatus destStatus = fs.getFileStatus(dst);
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+
+    amJarRsrc.setType(LocalResourceType.FILE);
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+    amJarRsrc.setTimestamp(destStatus.getModificationTime());
+    amJarRsrc.setSize(destStatus.getLen());
+    localResources.put("AMPoolService.jar",  amJarRsrc);
+
+    tmpAppDirPath   = conf.get(AMPoolConfiguration.TMP_DIR_PATH,
+        AMPoolConfiguration.DEFAULT_TMP_DIR_PATH)
+      + File.separator + appId.toString() + File.separator;
+    File appDir = new File(tmpAppDirPath);
+    appDir.deleteOnExit();
+
+    LOG.info("Writing contents of LazyAMConfig to local fs");
+
+    Configuration lazyAMConfToUpload = new LazyAMConfig();
+    File lazyAmConfFile = new File(tmpAppDirPath,
+        LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+    LOG.info("Writing contents of LazyAMConfig to local fs"
+        + ", path=" + lazyAmConfFile.getAbsolutePath());
+    lazyAmConfFile.getParentFile().mkdirs();
+    lazyAMConfToUpload.writeXml(new FileOutputStream(lazyAmConfFile));
+
+    Path lazyAmConfPathOnDfs = new Path(
+        fs.getHomeDirectory(),
+        appName + "/" + appId.toString() + "/"
+        + LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+
+    LOG.info("Uploading contents of LazyAMConfig to dfs"
+        + ", srcpath=" + lazyAmConfFile.getAbsolutePath()
+        + ", destpath=" + lazyAmConfPathOnDfs.toString());
+    fs.copyFromLocalFile(false, true,
+        new Path(lazyAmConfFile.getAbsolutePath()), lazyAmConfPathOnDfs);
+    fs.getFileStatus(lazyAmConfPathOnDfs);
+
+    Configuration amPoolConfToUpload = new AMPoolConfiguration();
+    amPoolConfToUpload.set(
+        AMPoolConfiguration.LAZY_AM_CONF_FILE_PATH,
+        lazyAmConfPathOnDfs.toString());
+
+    File amPoolConfFile = new File(tmpAppDirPath,
+        AMPoolConfiguration.AMPOOL_APP_XML_FILE);
+    amPoolConfFile.getParentFile().mkdirs();
+    LOG.info("Writing contents of AmPoolConf to local fs"
+        + ", path=" + amPoolConfFile.getAbsolutePath());
+    amPoolConfToUpload.writeXml(new FileOutputStream(amPoolConfFile));
+
+    Path amPoolConfPathOnDfs = new Path(
+        fs.getHomeDirectory(),
+        appName + "/" + appId.toString() + "/"
+        + AMPoolConfiguration.AMPOOL_APP_XML_FILE);
+    LOG.info("Uploading contents of AmPoolConfig to dfs"
+        + ", srcpath=" + amPoolConfFile.getAbsolutePath()
+        + ", destpath=" + amPoolConfPathOnDfs.toString());
+    fs.copyFromLocalFile(false, true,
+        new Path(amPoolConfFile.getAbsolutePath()),
+        amPoolConfPathOnDfs);
+    FileStatus amPoolConfFileStatus =
+        fs.getFileStatus(amPoolConfPathOnDfs);
+
+    LocalResource amPoolConfRsrc = Records.newRecord(LocalResource.class);
+    amPoolConfRsrc.setType(LocalResourceType.FILE);
+    amPoolConfRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    amPoolConfRsrc.setResource(ConverterUtils.getYarnUrlFromPath(
+        amPoolConfPathOnDfs));
+    amPoolConfRsrc.setTimestamp(amPoolConfFileStatus.getModificationTime());
+    amPoolConfRsrc.setSize(amPoolConfFileStatus.getLen());
+    LOG.info("Adding AMPoolConfig as a local resource for AMPoolServiceAM");
+    localResources.put(AMPoolConfiguration.AMPOOL_APP_XML_FILE,
+        amPoolConfRsrc);
+
+    amContainer.setLocalResources(localResources);
+
+    // Set the env variables to be setup in the env
+    // where the application master will be run
+    LOG.info("Set the environment for the application master");
+    Map<String, String> env = new HashMap<String, String>();
+
+    StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:.");
+    for (String c : conf.getStrings(
+        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(':');
+      classPathEnv.append(c.trim());
+    }
+
+    for (String c : conf.getStrings(
+        MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+        MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(':');
+      classPathEnv.append(c.trim());
+    }
+
+    for (String c : conf.getStrings(
+        AMPoolConfiguration.AMPOOL_APPLICATION_CLASSPATH,
+        AMPoolConfiguration.DEFAULT_AMPOOL_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(':');
+      classPathEnv.append(c.trim());
+    }
+
+    // add the runtime classpath needed for tests to work
+    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      classPathEnv.append(':');
+      classPathEnv.append(System.getProperty("java.class.path"));
+    }
+
+    env.put("CLASSPATH", classPathEnv.toString());
+
+    amContainer.setEnvironment(env);
+
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    LOG.info("Setting up app master command");
+    vargs.add("${JAVA_HOME}" + "/bin/java");
+    // Set Xmx based on am memory size
+    int normalizeMem = (int)(amMemory * 0.75);
+    vargs.add("-Xmx" + normalizeMem + "m");
+    // Set class name
+    vargs.add(appMasterMainClass);
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+        + "/AMPoolService.stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+        + "/AMPoolService.stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up app master command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+    amContainer.setCommands(commands);
+
+    // Set up resource type requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(amMemory);
+    amContainer.setResource(capability);
+
+    appContext.setAMContainerSpec(amContainer);
+
+    // Set the queue to which this application is to be submitted in the RM
+    String amQueue = conf.get(AMPoolConfiguration.AM_MASTER_QUEUE,
+        AMPoolConfiguration.DEFAULT_AM_MASTER_QUEUE);
+    appContext.setQueue(amQueue);
+
+    // Submit the application to the applications manager
+    LOG.info("Submitting application to ASM");
+    super.submitApplication(appContext);
+
+    LOG.info("Monitoring application, appId="
+        + appId);
+    while (true) {
+      ApplicationReport report = super.getApplicationReport(appId);
+      if (report.getFinalApplicationStatus().equals(
+          FinalApplicationStatus.FAILED)
+          || report.getFinalApplicationStatus().equals(
+              FinalApplicationStatus.KILLED)) {
+        LOG.fatal("AMPoolService failed to start up");
+        return false;
+      }
+      else if (report.getYarnApplicationState().equals(
+          YarnApplicationState.RUNNING)) {
+        LOG.info("AMPoolService running on"
+            + ", host=" + report.getHost()
+            + ", trackingUrl=" + report.getTrackingUrl());
+        break;
+      }
+      else if (report.getFinalApplicationStatus().equals(
+          FinalApplicationStatus.SUCCEEDED)) {
+        LOG.warn("AMPoolService shutdown");
+        return false;
+      }
+    }
+    return true;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMFinishedEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+
+public class AMFinishedEvent extends AMPoolEvent {
+
+  private final ApplicationId applicationId;
+  private final FinalApplicationStatus finalApplicationStatus;
+
+  public AMFinishedEvent(ApplicationId applicationId,
+      FinalApplicationStatus finalApplicationStatus) {
+    super(AMPoolEventType.AM_FINISHED);
+    this.applicationId = applicationId;
+    this.finalApplicationStatus = finalApplicationStatus;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return finalApplicationStatus;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMLaunchedEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class AMLaunchedEvent extends AMPoolEvent {
+
+  private final ApplicationId applicationId;
+
+  public AMLaunchedEvent(ApplicationId applicationId) {
+    super(AMPoolEventType.AM_LAUNCHED);
+    this.applicationId = applicationId;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMPoolEvent extends AbstractEvent<AMPoolEventType> {
+
+  public AMPoolEvent(AMPoolEventType type) {
+    super(type);
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+public enum AMPoolEventType {
+  LAUNCH_AM,
+  AM_FINISHED,
+  AM_LAUNCHED
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/manager/AMPoolManager.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.manager;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tez.ampool.AMContext;
+import org.apache.tez.ampool.AMLauncher;
+import org.apache.tez.ampool.AMMonitorEvent;
+import org.apache.tez.ampool.AMMonitorEventType;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.ampool.mr.MRAMLauncher;
+
+public class AMPoolManager extends CompositeService
+    implements EventHandler<AMPoolEvent> {
+
+  private static final Log LOG = LogFactory.getLog(AMPoolManager.class);
+
+  private AMLauncher amLauncher;
+
+  private final Dispatcher dispatcher;
+
+  private int numAMs;
+
+  private int maxNumAMs;
+
+  private AtomicInteger pendingLaunches;
+  private AtomicInteger failedLaunches;
+
+  private ConcurrentHashMap<ApplicationId, AMContext> appContexts;
+
+  private LinkedList<AMContext> unassignedAppContexts;
+
+  private YarnClient yarnClient;
+  private final String nmHost;
+
+  private String pollingUrl;
+  private boolean launchAMOnCompletion = true;
+  private int maxLaunchFailures = 100;
+  private final String lazyAMConfigPath;
+  private final String tmpAppDirPath;
+  private final boolean inCLIMode;
+
+  public AMPoolManager(Configuration conf, Dispatcher dispatcher, YarnClient yarnClient,
+      String nmHost, boolean inCLIMode, String tmpAppDirPath) {
+    super(AMPoolManager.class.getName());
+    this.dispatcher = dispatcher;
+    this.nmHost = nmHost;
+    appContexts = new ConcurrentHashMap<ApplicationId, AMContext>();
+    unassignedAppContexts = new LinkedList<AMContext>();
+    pendingLaunches = new AtomicInteger();
+    failedLaunches = new AtomicInteger();
+    this.yarnClient = yarnClient;
+    this.lazyAMConfigPath = conf.get(
+        AMPoolConfiguration.LAZY_AM_CONF_FILE_PATH);
+    this.inCLIMode = inCLIMode;
+    this.tmpAppDirPath = tmpAppDirPath;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    LOG.info("Initializing AMPoolManager");
+    numAMs = conf.getInt(AMPoolConfiguration.AM_POOL_SIZE,
+        AMPoolConfiguration.DEFAULT_AM_POOL_SIZE);
+    maxNumAMs = conf.getInt(AMPoolConfiguration.MAX_AM_POOL_SIZE,
+        AMPoolConfiguration.DEFAULT_MAX_AM_POOL_SIZE);
+    launchAMOnCompletion = conf.getBoolean(
+        AMPoolConfiguration.AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION,
+        AMPoolConfiguration.DEFAULT_AM_LAUNCH_NEW_AM_AFTER_APP_COMPLETION);
+    maxLaunchFailures = conf.getInt(
+        AMPoolConfiguration.MAX_AM_LAUNCH_FAILURES,
+        AMPoolConfiguration.DEFAULT_MAX_AM_LAUNCH_FAILURES);
+
+    this.pollingUrl = "http://" + nmHost
+        + ":" + conf.getInt(AMPoolConfiguration.WS_PORT,
+            AMPoolConfiguration.DEFAULT_WS_PORT)
+        + "/applications/poll/";
+
+    LOG.info("Initializing AMPoolManager"
+        + ", poolSize=" + numAMs
+        + ", maxPoolSize=" + maxNumAMs
+        + ", launchAMOnCompletion=" + launchAMOnCompletion
+        + ", pollingUrlForAMs=" + pollingUrl
+        + ", lazyMRAMConfig=" + this.lazyAMConfigPath);
+
+    if (numAMs <= 0
+        || maxNumAMs <= 0
+        || numAMs > maxNumAMs
+        || maxLaunchFailures < 0) {
+      throw new IllegalArgumentException("Invalid configuration values"
+          + " specified"
+          + ", poolSize=" + numAMs
+          + ", maxPoolSize=" + maxNumAMs
+          + ", maxLaunchFailures" + maxLaunchFailures);
+    }
+
+    try {
+      amLauncher = new MRAMLauncher(conf, dispatcher, yarnClient,
+          pollingUrl, lazyAMConfigPath, inCLIMode, tmpAppDirPath);
+      if (amLauncher instanceof Service) {
+        addService((Service) amLauncher);
+      }
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException("Could not initialize launcher");
+    }
+    super.init(conf);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void start() {
+    LOG.info("Starting AMPoolManager");
+    super.start();
+    for (int i = 0; i < numAMs; ++i) {
+      AMPoolEvent e = new AMPoolEvent(AMPoolEventType.LAUNCH_AM);
+      this.dispatcher.getEventHandler().handle(e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping AMPoolManager");
+    super.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  public synchronized ApplicationId getAM() {
+    if (unassignedAppContexts.isEmpty()) {
+      LOG.warn("Did not find any unassigned AMs to allocate for new job"
+          + ", pendingLaunchCount=" + pendingLaunches.get()
+          + ", currentApplicationCount=" + appContexts.size()
+          + ", unassignedAMsCount=" + unassignedAppContexts.size());
+      return null;
+    }
+    AMContext popped = unassignedAppContexts.removeFirst();
+    LOG.info("Assigning new application id to new application"
+        + ", applicationId=" + popped.getApplicationId());
+    appContexts.put(popped.getApplicationId(), popped);
+    if (!launchAMOnCompletion) {
+      LOG.info("Launching new AM as assigned app to new application"
+        + ", assignedAppId=" + popped.getApplicationId());
+      AMPoolEvent e = new AMPoolEvent(AMPoolEventType.LAUNCH_AM);
+      this.dispatcher.getEventHandler().handle(e);
+    }
+    return popped.getApplicationId();
+  }
+
+  public synchronized void killAM(ApplicationId applicationId) {
+    // TODO Auto-generated method stub
+  }
+
+  public synchronized void submitApplication(SubmitApplicationRequest request) {
+    LOG.info("Received an ApplicationSubmissionContext from client for"
+        + " application="
+        + request.getApplicationSubmissionContext().getApplicationId()
+            .toString());
+    AMContext amContext = appContexts.get(
+        request.getApplicationSubmissionContext().getApplicationId());
+    amContext.setSubmissionContext(request.getApplicationSubmissionContext());
+    amContext.setApplicationSubmissionTime(System.currentTimeMillis());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized void handle(AMPoolEvent event) {
+    switch (event.getType()) {
+      case LAUNCH_AM:
+        // TODO
+        if ((pendingLaunches.get() +
+            appContexts.size() +
+            unassignedAppContexts.size()) >= maxNumAMs) {
+          LOG.info("Hitting AM pool limits, ignoring launch event"
+              + ", pendingLaunchCount=" + pendingLaunches.get()
+              + ", currentApplicationCount=" + appContexts.size()
+              + ", unassignedAMsCount=" + unassignedAppContexts.size());
+          break;
+        }
+        pendingLaunches.incrementAndGet();
+        amLauncher.launchAM();
+        break;
+      case AM_FINISHED:
+        // TODO
+        // if unassigned we should launch a new AM
+        AMFinishedEvent fEvent = (AMFinishedEvent) event;
+        ApplicationId appId = fEvent.getApplicationId();
+        boolean isManaged = isManagedApp(appId);
+        LOG.info("Received an AM finished event for application"
+            + ", application=" + appId
+            + ", isManaged=" + isManaged);
+        if (!isManaged) {
+          if (!fEvent.getFinalApplicationStatus().equals(
+              FinalApplicationStatus.SUCCEEDED)) {
+            int failures = failedLaunches.incrementAndGet();
+            LOG.info("Unassigned AM failed."
+                + ", totalFailedCount=" + failures);
+            if (failures > maxLaunchFailures) {
+              throw new RuntimeException("Getting too many failed launches"
+                  + ", failure count exceeded " + maxLaunchFailures
+                  + ", exiting");
+            }
+          }
+        }
+        removeAppId(appId);
+        int currentAMCount = pendingLaunches.get() +
+            appContexts.size() +
+            unassignedAppContexts.size();
+        if (launchAMOnCompletion
+            || !isManaged
+            || currentAMCount < numAMs) {
+          LOG.info("Launching new AM on receiving finish event"
+              + ", finishedAppId=" + appId
+              + ", launchOnCompletion=" + launchAMOnCompletion
+              + ", finishedAMWasManaged=" + isManaged
+              + ", currentAMCount=" + currentAMCount
+              + ", minAMCount=" + numAMs);
+          AMPoolEvent launchAMEvent =
+              new AMPoolEvent(AMPoolEventType.LAUNCH_AM);
+          this.dispatcher.getEventHandler().handle(launchAMEvent);
+        }
+        break;
+      case AM_LAUNCHED:
+        AMLaunchedEvent e = (AMLaunchedEvent) event;
+        AMContext amContext = new AMContext(e.getApplicationId());
+        unassignedAppContexts.add(amContext);
+        int currentCount = pendingLaunches.decrementAndGet();
+        LOG.info("AM launched for applicationId="
+            + e.getApplicationId()
+            + ", currentPending=" + currentCount);
+        AMMonitorEvent mEvent = new AMMonitorEvent(
+            AMMonitorEventType.AM_MONITOR_START,
+            e.getApplicationId());
+        this.dispatcher.getEventHandler().handle(mEvent);
+    }
+
+  }
+
+  public synchronized boolean isManagedApp(ApplicationId applicationId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Managed app contexts size=" + appContexts.size());
+      for (ApplicationId appId : appContexts.keySet()) {
+        LOG.debug("Dumping appContexts, appId=" + appId.toString());
+      }
+    }
+    return appContexts.containsKey(applicationId);
+  }
+
+  public synchronized AMContext getAMContext(
+      ApplicationAttemptId applicationAttemptId) {
+    LOG.info("Received a getSubmissionContext request from"
+        + " applicationAttemptId=" + applicationAttemptId.toString());
+    if (appContexts.containsKey(applicationAttemptId.getApplicationId())) {
+      AMContext amContext = appContexts.get(
+          applicationAttemptId.getApplicationId());
+      LOG.info("Received a getSubmissionContext request from"
+          + " applicationAttemptId=" + applicationAttemptId.toString()
+          + ", assigning job");
+      amContext.setCurrentApplicationAttemptId(applicationAttemptId);
+      if (amContext.getSubmissionContext() != null) {
+        amContext.setJobPickUpTime(System.currentTimeMillis());
+      }
+      return amContext;
+    }
+    boolean found = false;
+    for (AMContext amContext : unassignedAppContexts) {
+      if (amContext.getApplicationId().equals(
+          applicationAttemptId.getApplicationId())) {
+        found = true;
+      }
+    }
+    if (!found) {
+      throw new RuntimeException("Could not find application id");
+    }
+    LOG.info("Received a getSubmissionContext request from"
+        + " applicationAttemptId=" + applicationAttemptId.toString()
+        + ", no job to assign");
+    return null;
+  }
+
+  public int getPendingLaunches() {
+    return pendingLaunches.get();
+  }
+
+  public int getFailedLaunches() {
+    return failedLaunches.get();
+  }
+
+  public synchronized List<AMContext> getUnassignedApplications() {
+    return Collections.unmodifiableList(unassignedAppContexts);
+  }
+
+  public synchronized Map<ApplicationId, AMContext> getApplicationsDump() {
+    return Collections.unmodifiableMap(appContexts);
+  }
+
+  private synchronized void removeAppId(ApplicationId appId) {
+    appContexts.remove(appId);
+    for (int i = 0; i < unassignedAppContexts.size(); ++i) {
+      AMContext amContext = unassignedAppContexts.get(i);
+      if (amContext.getApplicationId().equals(appId)) {
+        unassignedAppContexts.remove(i);
+        break;
+      }
+    }
+  }
+
+  public synchronized AMContext getAMContext(ApplicationId applicationId) {
+    return appContexts.get(applicationId);
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/mr/MRAMLauncher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.mr;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyAMConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.ampool.AMLauncher;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.ampool.manager.AMLaunchedEvent;
+
+public class MRAMLauncher extends AbstractService
+    implements AMLauncher {
+
+  private static final Log LOG = LogFactory.getLog(MRAMLauncher.class);
+
+  private final FileContext defaultFileContext;
+
+  private final Dispatcher dispatcher;
+
+  private YarnClient yarnClient;
+
+  private ExecutorService executorService;
+
+  private Configuration conf;
+
+  private final String pollingUrl;
+
+  private String lazyAMConfigPathStr;
+
+  private final boolean inCLIMode;
+
+  private final String tmpAppDirPath;
+
+  private final String user;
+
+  private Path lazyAmConfPathOnDfs;
+
+  private FileSystem fs = null;
+
+  public MRAMLauncher(Configuration conf, Dispatcher dispatcher,
+      YarnClient yarnClient, String pollingUrl, String lazyAMConfigPath,
+      boolean inCLIMode, String tmpAppDirPath)
+      throws Exception {
+    super(MRAMLauncher.class.getName());
+    // TODO Auto-generated constructor stub
+    this.defaultFileContext = FileContext.getFileContext(conf);
+    this.dispatcher = dispatcher;
+    this.yarnClient = yarnClient;
+    this.pollingUrl = pollingUrl;
+    this.lazyAMConfigPathStr = lazyAMConfigPath;
+    this.inCLIMode = inCLIMode;
+    this.tmpAppDirPath = tmpAppDirPath;
+    this.user = UserGroupInformation.getCurrentUser().getShortUserName();
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    executorService = Executors.newCachedThreadPool();
+    this.conf = conf;
+    try {
+      fs = FileSystem.get(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    if (!inCLIMode) {
+      return;
+    }
+    Configuration lazyAMConfToUpload = new LazyAMConfig();
+    File lazyAmConfFile = new File(tmpAppDirPath,
+        LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+    LOG.info("Writing contents of LazyAMConfig to local fs"
+        + ", path=" + lazyAmConfFile.getAbsolutePath());
+    lazyAmConfFile.getParentFile().mkdirs();
+    try {
+      lazyAMConfToUpload.writeXml(new FileOutputStream(lazyAmConfFile));
+
+      String stagingDir = conf.get(AMPoolConfiguration.AM_STAGING_DIR,
+          AMPoolConfiguration.DEFAULT_AM_STAGING_DIR)
+          + Path.SEPARATOR + user + Path.SEPARATOR;
+
+      lazyAmConfPathOnDfs = new Path(stagingDir,
+          LazyAMConfig.LAZY_AM_JOB_XML_FILE);
+      this.lazyAMConfigPathStr = lazyAmConfPathOnDfs.toString();
+
+      FileSystem fs = FileSystem.get(conf);
+      LOG.info("Uploading contents of LazyAMConfig to dfs"
+          + ", srcpath=" + lazyAmConfFile.getAbsolutePath()
+          + ", destpath=" + lazyAmConfPathOnDfs.toString());
+      fs.copyFromLocalFile(false, true,
+          new Path(lazyAmConfFile.getAbsolutePath()), lazyAmConfPathOnDfs);
+      fs.getFileStatus(lazyAmConfPathOnDfs);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to setup lazy am config in DFS", e);
+    }
+  }
+
+  @Override public void stop() {
+    executorService.shutdownNow();
+    if (inCLIMode && fs != null
+        && lazyAmConfPathOnDfs != null) {
+      try {
+        fs.delete(lazyAmConfPathOnDfs, true);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Override
+  public synchronized void launchAM() {
+    LOG.info("Launching new AM");
+    executorService.execute(
+        new MRAMLauncherThread(dispatcher, yarnClient, conf));
+  }
+
+  private class MRAMLauncherThread implements Runnable {
+
+    private final Dispatcher dispatcher;
+    private final YarnClient yarnClient;
+    private final Configuration conf;
+
+    public MRAMLauncherThread(Dispatcher dispatcher,
+        YarnClient yarnClient,
+        Configuration conf) {
+      this.dispatcher = dispatcher;
+      this.yarnClient = yarnClient;
+      this.conf = conf;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void run() {
+      ApplicationId newAppId = null;
+      try {
+        newAppId = yarnClient.getNewApplication().getApplicationId();
+          ApplicationSubmissionContext submissionContext =
+              createApplicationSubmissionContext(newAppId, conf);
+          yarnClient.submitApplication(submissionContext);
+          LOG.info("Launched new MR AM"
+              + ", applicationId=" + newAppId);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      if (newAppId != null) {
+        AMLaunchedEvent event = new AMLaunchedEvent(newAppId);
+        this.dispatcher.getEventHandler().handle(event);
+      }
+    }
+  }
+
+  private LocalResource createApplicationResource(FileContext fs,
+      Path p, LocalResourceType type) throws IOException {
+    LocalResource rsrc = Records.newRecord(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    return rsrc;
+  }
+
+  public ApplicationSubmissionContext createApplicationSubmissionContext(
+      ApplicationId applicationId,
+      Configuration conf) throws IOException {
+
+    // Setup resource requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(
+        conf.getInt(
+            AMPoolConfiguration.MR_AM_MEMORY_ALLOCATION_MB,
+            AMPoolConfiguration.DEFAULT_MR_AM_MEMORY_ALLOCATION_MB
+            )
+        );
+    // TODO for now use default of 1 for virtual cores
+    capability.setVirtualCores(
+        conf.getInt(
+            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+            )
+        );
+    LOG.info("LazyMRMAM capability = " + capability);
+
+    // Setup LocalResources
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+
+    if (lazyAMConfigPathStr == null
+        || lazyAMConfigPathStr.isEmpty()) {
+      LOG.error("Invalid path to lazy am config provided");
+      // TODO throw error?
+    } else {
+      LOG.info("Using lazyAMConf as a local resource for LazyAM"
+          + ", confPath=" + lazyAMConfigPathStr);
+      Path lazyAMConfPath = new Path(lazyAMConfigPathStr);
+      localResources.put(LazyAMConfig.LAZY_AM_JOB_XML_FILE,
+          createApplicationResource(defaultFileContext, lazyAMConfPath,
+              LocalResourceType.FILE));
+    }
+
+    String[] jarPaths = conf.getStrings(AMPoolConfiguration.MR_AM_JOB_JAR_PATH,
+        AMPoolConfiguration.DEFAULT_MR_AM_JOB_JAR_PATH);
+    if (jarPaths.length == 1) {
+      String jarPath = jarPaths[0];
+      if (jarPath != null) {
+        jarPath = jarPath.trim();
+        if (!jarPath.isEmpty()) {
+          Path jobJarPath = new Path(jarPath);
+          LocalResource rc = createApplicationResource(defaultFileContext,
+              jobJarPath, LocalResourceType.FILE);
+          localResources.put(MRJobConfig.JOB_JAR, rc);
+        }
+      }
+    } else {
+      for (String jarPath : jarPaths) {
+        jarPath = jarPath.trim();
+        if (jarPath == null || jarPath.isEmpty()) {
+          continue;
+        }
+        Path jobJarPath = new Path(jarPath);
+        String jarName = jobJarPath.getName();
+        LocalResource rc = createApplicationResource(defaultFileContext,
+            jobJarPath, LocalResourceType.FILE);
+        localResources.put(jarName, rc);
+      }
+    }
+
+    // Setup the command to run the AM
+    List<String> vargs = new ArrayList<String>(8);
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+    // TODO: why do we use 'conf' some places and 'jobConf' others?
+    long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
+    String logLevel = conf.get(
+        MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
+    MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+
+    // Add AM admin command opts before user command opts
+    // so that it can be overridden by user
+    vargs.add(conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS));
+
+    vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
+
+    vargs.add(conf.get(AMPoolConfiguration.APPLICATION_MASTER_CLASS,
+        AMPoolConfiguration.DEFAULT_APPLICATION_MASTER_CLASS));
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        Path.SEPARATOR + ApplicationConstants.STDOUT);
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        Path.SEPARATOR + ApplicationConstants.STDERR);
+
+
+    Vector<String> vargsFinal = new Vector<String>(8);
+    // Final command
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    vargsFinal.add(mergedCommand.toString());
+
+    LOG.debug("Command to launch container for ApplicationMaster is : "
+        + mergedCommand);
+
+    // Setup the CLASSPATH in environment
+    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+    MRApps.setClasspath(environment, conf);
+
+    // Setup the environment variables (LD_LIBRARY_PATH, etc)
+    MRApps.setEnvFromInputString(environment,
+        conf.get(MRJobConfig.MR_AM_ENV));
+
+    environment.put(AMPoolConfiguration.LAZY_AM_POLLING_URL_ENV, pollingUrl);
+
+    // TODO distributed cache ?
+    // TODO acls ?
+
+    // Setup ContainerLaunchContext for AM container
+    ContainerLaunchContext amContainer = BuilderUtils
+        .newContainerLaunchContext(null, UserGroupInformation
+            .getCurrentUser().getShortUserName(), capability, localResources,
+            environment, vargsFinal, null, null, null);
+
+    // Set up the ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext =
+       Records.newRecord(ApplicationSubmissionContext.class);
+    appContext.setApplicationId(applicationId);
+    appContext.setUser(
+        UserGroupInformation.getCurrentUser().getShortUserName());
+
+    appContext.setQueue(
+        conf.get(AMPoolConfiguration.MR_AM_QUEUE_NAME,
+        YarnConfiguration.DEFAULT_QUEUE_NAME));
+    appContext.setApplicationName("MRAMLaunchedbyAMPoolService");
+    appContext.setCancelTokensWhenComplete(
+        conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
+    appContext.setAMContainerSpec(amContainer);
+
+    return appContext;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatus.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool.rest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+public class AMPoolStatus {
+
+  @JsonProperty("hostname")
+  private String hostname;
+
+  @JsonProperty("pendingLaunches")
+  private int pendingLaunches;
+
+  @JsonProperty("failedLaunches")
+  private int failedLaunches;
+
+  @JsonProperty("applications")
+  private Map<String, String> applications;
+
+  @JsonProperty("unassignedApplications")
+  private List<String> unassignedApplications;
+
+  public AMPoolStatus() {
+    this.applications = new TreeMap<String, String>();
+    this.setUnassignedApplications(new ArrayList<String>());
+  }
+
+  /**
+   * @return the applications
+   */
+  public Map<String, String> getApplications() {
+    return applications;
+  }
+
+  /**
+   * @param applications the applications to set
+   */
+  public void setApplications(Map<String, String> applications) {
+    this.applications = applications;
+  }
+
+  /**
+   * @return the pendingLaunches
+   */
+  public int getPendingLaunches() {
+    return pendingLaunches;
+  }
+
+  /**
+   * @param pendingLaunches the pendingLaunches to set
+   */
+  public void setPendingLaunches(int pendingLaunches) {
+    this.pendingLaunches = pendingLaunches;
+  }
+
+  /**
+   * @return the failedLaunches
+   */
+  public int getFailedLaunches() {
+    return failedLaunches;
+  }
+
+  /**
+   * @param failedLaunches the failedLaunches to set
+   */
+  public void setFailedLaunches(int failedLaunches) {
+    this.failedLaunches = failedLaunches;
+  }
+
+  /**
+   * @return the unassignedApplications
+   */
+  public List<String> getUnassignedApplications() {
+    return unassignedApplications;
+  }
+
+  /**
+   * @param unassignedApplications the unassignedApplications to set
+   */
+  public void setUnassignedApplications(List<String> unassignedApplications) {
+    this.unassignedApplications = unassignedApplications;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/AMPoolStatusService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.ampool.rest;
+
+import java.util.Map.Entry;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.ampool.AMContext;
+import org.apache.tez.ampool.manager.AMPoolManager;
+
+@Path("/master/")
+public class AMPoolStatusService {
+
+  private static AMPoolManager manager;
+
+  public static void init(AMPoolManager manager) {
+    AMPoolStatusService.manager = manager;
+  }
+
+  @Path("status")
+  @GET
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces({MediaType.APPLICATION_JSON})
+  public AMPoolStatus getStatus(
+      @Context HttpServletRequest req) {
+    AMPoolStatus status = new AMPoolStatus();
+    status.setHostname(req.getLocalName());
+    status.setPendingLaunches(manager.getPendingLaunches());
+    status.setFailedLaunches(manager.getFailedLaunches());
+    for (AMContext amContext : manager.getUnassignedApplications()) {
+      status.getUnassignedApplications().add(
+          amContext.getApplicationId().toString());
+    }
+    for (Entry<ApplicationId, AMContext> entry :
+        manager.getApplicationsDump().entrySet()) {
+      String attemptIdStr = null;
+      if (entry.getValue().getCurrentApplicationAttemptId() != null) {
+        attemptIdStr =
+            entry.getValue().getCurrentApplicationAttemptId().toString();
+      }
+      status.getApplications().put(entry.getKey().toString(), attemptIdStr);
+    }
+    return status;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/rest/ApplicationPollResponse.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.ampool.rest;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class ApplicationPollResponse {
+
+  private String applicationIdStr;
+
+  private String configurationFileLocation;
+
+  private String applicationJarLocation;
+
+  private long applicationSubmissionTime;
+
+  public ApplicationPollResponse() {
+    this.applicationIdStr = null;
+  }
+
+  public ApplicationPollResponse(ApplicationId applicationId,
+      long applicationSubmissionTime) {
+    this.setApplicationIdStr(ConverterUtils.toString(applicationId));
+    this.applicationSubmissionTime = applicationSubmissionTime;
+  }
+
+  public String getApplicationIdStr() {
+    return applicationIdStr;
+  }
+
+  public void setApplicationIdStr(String applicationIdStr) {
+    this.applicationIdStr = applicationIdStr;
+  }
+
+  public String getApplicationJarLocation() {
+    return applicationJarLocation;
+  }
+
+  public void setApplicationJarLocation(String applicationJarLocation) {
+    this.applicationJarLocation = applicationJarLocation;
+  }
+
+  public String getConfigurationFileLocation() {
+    return configurationFileLocation;
+  }
+
+  public void setConfigurationFileLocation(String configurationFileLocation) {
+    this.configurationFileLocation = configurationFileLocation;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("applicationId=" + applicationIdStr
+        + ", configurationFileLocation=" + configurationFileLocation
+        + ", applicationJarLocation=" + applicationJarLocation
+        + ", applicationSubmissionTime=" + applicationSubmissionTime);
+    return sb.toString();
+  }
+
+  public long getApplicationSubmissionTime() {
+    return applicationSubmissionTime;
+  }
+
+  public void setApplicationSubmissionTime(long applicationSubmissionTime) {
+    this.applicationSubmissionTime = applicationSubmissionTime;
+  }
+
+}