You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:55 UTC

[13/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
new file mode 100644
index 0000000..47dd07c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+/**
+ * A base implementation of {@link Service} handle secure token update.
+ */
+public abstract class AbstractTwillService implements Service {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
+
+  protected final Location applicationLocation;
+
+  protected volatile Credentials credentials;
+
+  protected AbstractTwillService(Location applicationLocation) {
+    this.applicationLocation = applicationLocation;
+  }
+
+  protected abstract Service getServiceDelegate();
+
+  /**
+   * Returns the location of the secure store, or {@code null} if either not running in secure mode or an error
+   * occur when trying to acquire the location.
+   */
+  protected final Location getSecureStoreLocation() {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return null;
+    }
+    try {
+      return applicationLocation.append(Constants.Files.CREDENTIALS);
+    } catch (IOException e) {
+      LOG.error("Failed to create secure store location.", e);
+      return null;
+    }
+  }
+
+  /**
+   * Attempts to handle secure store update.
+   *
+   * @param message The message received
+   * @return {@code true} if the message requests for secure store update, {@code false} otherwise.
+   */
+  protected final boolean handleSecureStoreUpdate(Message message) {
+    if (!SystemMessages.SECURE_STORE_UPDATED.equals(message)) {
+      return false;
+    }
+
+    // If not in secure mode, simply ignore the message.
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return true;
+    }
+
+    try {
+      Credentials credentials = new Credentials();
+      Location location = getSecureStoreLocation();
+      DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()));
+      try {
+        credentials.readTokenStorageStream(input);
+      } finally {
+        input.close();
+      }
+
+      UserGroupInformation.getCurrentUser().addCredentials(credentials);
+      this.credentials = credentials;
+
+      LOG.info("Secure store updated from {}.", location.toURI());
+
+    } catch (Throwable t) {
+      LOG.error("Failed to update secure store.", t);
+    }
+
+    return true;
+  }
+
+  @Override
+  public final ListenableFuture<State> start() {
+    return getServiceDelegate().start();
+  }
+
+  @Override
+  public final State startAndWait() {
+    return Futures.getUnchecked(start());
+  }
+
+  @Override
+  public final boolean isRunning() {
+    return getServiceDelegate().isRunning();
+  }
+
+  @Override
+  public final State state() {
+    return getServiceDelegate().state();
+  }
+
+  @Override
+  public final ListenableFuture<State> stop() {
+    return getServiceDelegate().stop();
+  }
+
+  @Override
+  public final State stopAndWait() {
+    return Futures.getUnchecked(stop());
+  }
+
+  @Override
+  public final void addListener(Listener listener, Executor executor) {
+    getServiceDelegate().addListener(listener, executor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
new file mode 100644
index 0000000..4ffb023
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.filesystem.HDFSLocationFactory;
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.logging.KafkaAppender;
+import org.apache.twill.zookeeper.ZKClientService;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.classic.util.ContextInitializer;
+import ch.qos.logback.core.joran.spi.JoranException;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.ILoggerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.InputSource;
+
+import java.io.File;
+import java.io.StringReader;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Class for main method that starts a service.
+ */
+public abstract class ServiceMain {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ServiceMain.class);
+
+  static {
+    // This is to work around detection of HADOOP_HOME (HADOOP-9422)
+    if (!System.getenv().containsKey("HADOOP_HOME") && System.getProperty("hadoop.home.dir") == null) {
+      System.setProperty("hadoop.home.dir", new File("").getAbsolutePath());
+    }
+  }
+
+  protected final void doMain(final ZKClientService zkClientService,
+                              final Service service) throws ExecutionException, InterruptedException {
+    configureLogger();
+
+    final String serviceName = service.toString();
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        Services.chainStop(service, zkClientService);
+      }
+    });
+
+    // Listener for state changes of the service
+    ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
+
+    // Starts the service
+    LOG.info("Starting service {}.", serviceName);
+    Futures.getUnchecked(Services.chainStart(zkClientService, service));
+    LOG.info("Service {} started.", serviceName);
+    try {
+      completion.get();
+      LOG.info("Service {} completed.", serviceName);
+    } catch (Throwable t) {
+      LOG.warn("Exception thrown from service {}.", serviceName, t);
+      throw Throwables.propagate(t);
+    } finally {
+      ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+      if (loggerFactory instanceof LoggerContext) {
+        ((LoggerContext) loggerFactory).stop();
+      }
+    }
+  }
+
+  protected abstract String getHostname();
+
+  protected abstract String getKafkaZKConnect();
+
+  /**
+   * Returns the {@link Location} for the application based on the env {@link EnvKeys#TWILL_APP_DIR}.
+   */
+  protected static Location createAppLocation(Configuration conf) {
+    // Note: It's a little bit hacky based on the uri schema to create the LocationFactory, refactor it later.
+    URI appDir = URI.create(System.getenv(EnvKeys.TWILL_APP_DIR));
+
+    try {
+      if ("file".equals(appDir.getScheme())) {
+        return new LocalLocationFactory().create(appDir);
+      }
+
+      if ("hdfs".equals(appDir.getScheme())) {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          return new HDFSLocationFactory(FileSystem.get(conf)).create(appDir);
+        }
+
+        String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
+        if (fsUser == null) {
+          throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
+        }
+        return new HDFSLocationFactory(FileSystem.get(FileSystem.getDefaultUri(conf), conf, fsUser)).create(appDir);
+      }
+
+      LOG.warn("Unsupported location type {}.", appDir);
+      throw new IllegalArgumentException("Unsupported location type " + appDir);
+
+    } catch (Exception e) {
+      LOG.error("Failed to create application location for {}.", appDir);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private void configureLogger() {
+    // Check if SLF4J is bound to logback in the current environment
+    ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+    if (!(loggerFactory instanceof LoggerContext)) {
+      return;
+    }
+
+    LoggerContext context = (LoggerContext) loggerFactory;
+    context.reset();
+    JoranConfigurator configurator = new JoranConfigurator();
+    configurator.setContext(context);
+
+    try {
+      File twillLogback = new File(Constants.Files.LOGBACK_TEMPLATE);
+      if (twillLogback.exists()) {
+        configurator.doConfigure(twillLogback);
+      }
+      new ContextInitializer(context).autoConfig();
+    } catch (JoranException e) {
+      throw Throwables.propagate(e);
+    }
+    doConfigure(configurator, getLogConfig(getLoggerLevel(context.getLogger(Logger.ROOT_LOGGER_NAME))));
+  }
+
+  private void doConfigure(JoranConfigurator configurator, String config) {
+    try {
+      configurator.doConfigure(new InputSource(new StringReader(config)));
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private String getLogConfig(String rootLevel) {
+    return
+      "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+      "<configuration>\n" +
+      "    <appender name=\"KAFKA\" class=\"" + KafkaAppender.class.getName() + "\">\n" +
+      "        <topic>" + Constants.LOG_TOPIC + "</topic>\n" +
+      "        <hostname>" + getHostname() + "</hostname>\n" +
+      "        <zookeeper>" + getKafkaZKConnect() + "</zookeeper>\n" +
+      "    </appender>\n" +
+      "    <logger name=\"org.apache.twill.internal.logging\" additivity=\"false\" />\n" +
+      "    <root level=\"" + rootLevel + "\">\n" +
+      "        <appender-ref ref=\"KAFKA\"/>\n" +
+      "    </root>\n" +
+      "</configuration>";
+  }
+
+  private String getLoggerLevel(Logger logger) {
+    if (logger instanceof ch.qos.logback.classic.Logger) {
+      return ((ch.qos.logback.classic.Logger) logger).getLevel().toString();
+    }
+
+    if (logger.isTraceEnabled()) {
+      return "TRACE";
+    }
+    if (logger.isDebugEnabled()) {
+      return "DEBUG";
+    }
+    if (logger.isInfoEnabled()) {
+      return "INFO";
+    }
+    if (logger.isWarnEnabled()) {
+      return "WARN";
+    }
+    if (logger.isErrorEnabled()) {
+      return "ERROR";
+    }
+    return "OFF";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
new file mode 100644
index 0000000..028df7b
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+/**
+ * Represents data being stored in the live node of the application master.
+ */
+public final class ApplicationMasterLiveNodeData {
+
+  private final int appId;
+  private final long appIdClusterTime;
+  private final String containerId;
+
+  public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime, String containerId) {
+    this.appId = appId;
+    this.appIdClusterTime = appIdClusterTime;
+    this.containerId = containerId;
+  }
+
+  public int getAppId() {
+    return appId;
+  }
+
+  public long getAppIdClusterTime() {
+    return appIdClusterTime;
+  }
+
+  public String getContainerId() {
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
new file mode 100644
index 0000000..b34a7a2
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Main class for launching {@link ApplicationMasterService}.
+ */
+public final class ApplicationMasterMain extends ServiceMain {
+
+  private final String kafkaZKConnect;
+
+  private ApplicationMasterMain(String kafkaZKConnect) {
+    this.kafkaZKConnect = kafkaZKConnect;
+  }
+
+  /**
+   * Starts the application master.
+   */
+  public static void main(String[] args) throws Exception {
+    String zkConnect = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
+    File twillSpec = new File(Constants.Files.TWILL_SPEC);
+    RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
+
+    ZKClientService zkClientService =
+      ZKClientServices.delegate(
+        ZKClients.reWatchOnExpire(
+          ZKClients.retryOnFailure(
+            ZKClientService.Builder.of(zkConnect).build(),
+            RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+    Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
+    Service service = new ApplicationMasterService(runId, zkClientService, twillSpec,
+                                                   new VersionDetectYarnAMClientFactory(conf), createAppLocation(conf));
+    new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())).doMain(zkClientService, service);
+  }
+
+  @Override
+  protected String getHostname() {
+    try {
+      return InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      return "unknown";
+    }
+  }
+
+  @Override
+  protected String getKafkaZKConnect() {
+    return kafkaZKConnect;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
new file mode 100644
index 0000000..b51bb63
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.twill.internal.yarn.YarnUtils;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Map;
+
+/**
+ * A {@link org.apache.twill.internal.ProcessLauncher} for launching Application Master from the client.
+ */
+public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationId> {
+
+  private final ApplicationSubmitter submitter;
+
+  public ApplicationMasterProcessLauncher(ApplicationId appId, ApplicationSubmitter submitter) {
+    super(appId);
+    this.submitter = submitter;
+  }
+
+  @Override
+  protected boolean useArchiveSuffix() {
+    return false;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
+    final ApplicationId appId = getContainerInfo();
+
+    // Set the resource requirement for AM
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(Constants.APP_MASTER_MEMORY_MB);
+    YarnUtils.setVirtualCores(capability, 1);
+
+    // Put in extra environments
+    Map<String, String> env = ImmutableMap.<String, String>builder()
+      .putAll(launchContext.getEnvironment())
+      .put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId()))
+      .put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, Long.toString(appId.getClusterTimestamp()))
+      .put(EnvKeys.YARN_APP_ID_STR, appId.toString())
+      .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(Constants.APP_MASTER_MEMORY_MB))
+      .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(YarnUtils.getVirtualCores(capability)))
+      .build();
+
+    launchContext.setEnvironment(env);
+    return (ProcessController<R>) submitter.submit(launchContext, capability);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
new file mode 100644
index 0000000..51c8503
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.Configs;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.DefaultTwillRunResources;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.TwillContainerLauncher;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.json.LocalFileCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.internal.yarn.YarnAMClientFactory;
+import org.apache.twill.internal.yarn.YarnContainerInfo;
+import org.apache.twill.internal.yarn.YarnContainerStatus;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import com.google.common.io.CharStreams;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class ApplicationMasterService extends AbstractTwillService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
+
+  // Copied from org.apache.hadoop.yarn.security.AMRMTokenIdentifier.KIND_NAME since it's missing in Hadoop-2.0
+  private static final Text AMRM_TOKEN_KIND_NAME = new Text("YARN_AM_RM_TOKEN");
+
+  private final RunId runId;
+  private final ZKClient zkClient;
+  private final TwillSpecification twillSpec;
+  private final ApplicationMasterLiveNodeData amLiveNode;
+  private final ZKServiceDecorator serviceDelegate;
+  private final RunningContainers runningContainers;
+  private final ExpectedContainers expectedContainers;
+  private final TrackerService trackerService;
+  private final YarnAMClient amClient;
+  private final String jvmOpts;
+  private final int reservedMemory;
+  private final EventHandler eventHandler;
+  private final Location applicationLocation;
+
+  private EmbeddedKafkaServer kafkaServer;
+  private Queue<RunnableContainerRequest> runnableContainerRequests;
+  private ExecutorService instanceChangeExecutor;
+
+  public ApplicationMasterService(RunId runId, ZKClient zkClient, File twillSpecFile,
+                                  YarnAMClientFactory amClientFactory, Location applicationLocation) throws Exception {
+    super(applicationLocation);
+
+    this.runId = runId;
+    this.twillSpec = TwillSpecificationAdapter.create().fromJson(twillSpecFile);
+    this.zkClient = zkClient;
+    this.applicationLocation = applicationLocation;
+    this.amClient = amClientFactory.create();
+    this.credentials = createCredentials();
+    this.jvmOpts = loadJvmOptions();
+    this.reservedMemory = getReservedMemory();
+
+    amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
+                                                   Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
+                                                   amClient.getContainerId().toString());
+
+    serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeDataSupplier(),
+                                             new ServiceDelegate(), new Runnable() {
+      @Override
+      public void run() {
+        amClient.stopAndWait();
+      }
+    });
+    expectedContainers = initExpectedContainers(twillSpec);
+    runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
+    trackerService = new TrackerService(runningContainers.getResourceReport(), amClient.getHost());
+    eventHandler = createEventHandler(twillSpec);
+  }
+
+  private String loadJvmOptions() throws IOException {
+    final File jvmOptsFile = new File(Constants.Files.JVM_OPTIONS);
+    if (!jvmOptsFile.exists()) {
+      return "";
+    }
+
+    return CharStreams.toString(new InputSupplier<Reader>() {
+      @Override
+      public Reader getInput() throws IOException {
+        return new FileReader(jvmOptsFile);
+      }
+    });
+  }
+
+  private int getReservedMemory() {
+    String value = System.getenv(EnvKeys.TWILL_RESERVED_MEMORY_MB);
+    if (value == null) {
+      return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
+    }
+    try {
+      return Integer.parseInt(value);
+    } catch (Exception e) {
+      return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
+    }
+  }
+
+  private EventHandler createEventHandler(TwillSpecification twillSpec) {
+    try {
+      // Should be able to load by this class ClassLoader, as they packaged in the same jar.
+      EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
+
+      Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName());
+      Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
+                                  "Class {} does not implements {}",
+                                  handlerClass, EventHandler.class.getName());
+      return Instances.newInstance((Class<? extends EventHandler>) handlerClass);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private Supplier<? extends JsonElement> createLiveNodeDataSupplier() {
+    return new Supplier<JsonElement>() {
+      @Override
+      public JsonElement get() {
+        return new Gson().toJsonTree(amLiveNode);
+      }
+    };
+  }
+
+  private RunningContainers initRunningContainers(ContainerId appMasterContainerId,
+                                                  String appMasterHost) throws Exception {
+    TwillRunResources appMasterResources = new DefaultTwillRunResources(
+      0,
+      appMasterContainerId.toString(),
+      Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
+      Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
+      appMasterHost);
+    String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
+    return new RunningContainers(appId, appMasterResources);
+  }
+
+  private ExpectedContainers initExpectedContainers(TwillSpecification twillSpec) {
+    Map<String, Integer> expectedCounts = Maps.newHashMap();
+    for (RuntimeSpecification runtimeSpec : twillSpec.getRunnables().values()) {
+      expectedCounts.put(runtimeSpec.getName(), runtimeSpec.getResourceSpecification().getInstances());
+    }
+    return new ExpectedContainers(expectedCounts);
+  }
+
+  private void doStart() throws Exception {
+    LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
+
+    // initialize the event handler, if it fails, it will fail the application.
+    eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler()));
+
+    instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
+
+    kafkaServer = new EmbeddedKafkaServer(new File(Constants.Files.KAFKA), generateKafkaConfig());
+
+    // Must start tracker before start AMClient
+    LOG.info("Starting application master tracker server");
+    trackerService.startAndWait();
+    URL trackerUrl = trackerService.getUrl();
+    LOG.info("Started application master tracker server on " + trackerUrl);
+
+    amClient.setTracker(trackerService.getBindAddress(), trackerUrl);
+    amClient.startAndWait();
+
+    // Creates ZK path for runnable and kafka logging service
+    Futures.allAsList(ImmutableList.of(
+      zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
+      zkClient.create("/" + runId.getId() + "/kafka", null, CreateMode.PERSISTENT))
+    ).get();
+
+    // Starts kafka server
+    LOG.info("Starting kafka server");
+
+    kafkaServer.startAndWait();
+    LOG.info("Kafka server started");
+
+    runnableContainerRequests = initContainerRequests();
+  }
+
+  private void doStop() throws Exception {
+    Thread.interrupted();     // This is just to clear the interrupt flag
+
+    LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec));
+
+    try {
+      // call event handler destroy. If there is error, only log and not affected stop sequence.
+      eventHandler.destroy();
+    } catch (Throwable t) {
+      LOG.warn("Exception when calling {}.destroy()", twillSpec.getEventHandler().getClassName(), t);
+    }
+
+    instanceChangeExecutor.shutdownNow();
+
+    // For checking if all containers are stopped.
+    final Set<String> ids = Sets.newHashSet(runningContainers.getContainerIds());
+    YarnAMClient.AllocateHandler handler = new YarnAMClient.AllocateHandler() {
+      @Override
+      public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+        // no-op
+      }
+
+      @Override
+      public void completed(List<YarnContainerStatus> completed) {
+        for (YarnContainerStatus status : completed) {
+          ids.remove(status.getContainerId());
+        }
+      }
+    };
+
+    runningContainers.stopAll();
+
+    // Poll for 5 seconds to wait for containers to stop.
+    int count = 0;
+    while (!ids.isEmpty() && count++ < 5) {
+      amClient.allocate(0.0f, handler);
+      TimeUnit.SECONDS.sleep(1);
+    }
+
+    LOG.info("Stopping application master tracker server");
+    try {
+      trackerService.stopAndWait();
+      LOG.info("Stopped application master tracker server");
+    } catch (Exception e) {
+      LOG.error("Failed to stop tracker service.", e);
+    } finally {
+      try {
+        // App location cleanup
+        cleanupDir(URI.create(System.getenv(EnvKeys.TWILL_APP_DIR)));
+        Loggings.forceFlush();
+        // Sleep a short while to let kafka clients to have chance to fetch the log
+        TimeUnit.SECONDS.sleep(1);
+      } finally {
+        kafkaServer.stopAndWait();
+        LOG.info("Kafka server stopped");
+      }
+    }
+  }
+
+  private void cleanupDir(URI appDir) {
+    try {
+      if (applicationLocation.delete(true)) {
+        LOG.info("Application directory deleted: {}", appDir);
+      } else {
+        LOG.warn("Failed to cleanup directory {}.", appDir);
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception while cleanup directory {}.", appDir, e);
+    }
+  }
+
+
+  private void doRun() throws Exception {
+    // The main loop
+    Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> currentRequest = null;
+    final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
+
+    YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
+      @Override
+      public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+        launchRunnable(launchers, provisioning);
+      }
+
+      @Override
+      public void completed(List<YarnContainerStatus> completed) {
+        handleCompleted(completed);
+      }
+    };
+
+    long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
+    while (isRunning()) {
+      // Call allocate. It has to be made at first in order to be able to get cluster resource availability.
+      amClient.allocate(0.0f, allocateHandler);
+
+      // Looks for containers requests.
+      if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && runningContainers.isEmpty()) {
+        LOG.info("All containers completed. Shutting down application master.");
+        break;
+      }
+
+      // If nothing is in provisioning, and no pending request, move to next one
+      while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) {
+        currentRequest = runnableContainerRequests.peek().takeRequest();
+        if (currentRequest == null) {
+          // All different types of resource request from current order is done, move to next one
+          // TODO: Need to handle order type as well
+          runnableContainerRequests.poll();
+        }
+      }
+      // Nothing in provision, makes the next batch of provision request
+      if (provisioning.isEmpty() && currentRequest != null) {
+        addContainerRequests(currentRequest.getKey(), currentRequest.getValue(), provisioning);
+        currentRequest = null;
+      }
+
+      nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
+
+      if (isRunning()) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+  }
+
+  /**
+   * Handling containers that are completed.
+   */
+  private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
+    Multiset<String> restartRunnables = HashMultiset.create();
+    for (YarnContainerStatus status : completedContainersStatuses) {
+      LOG.info("Container {} completed with {}:{}.",
+               status.getContainerId(), status.getState(), status.getDiagnostics());
+      runningContainers.handleCompleted(status, restartRunnables);
+    }
+
+    for (Multiset.Entry<String> entry : restartRunnables.entrySet()) {
+      LOG.info("Re-request container for {} with {} instances.", entry.getElement(), entry.getCount());
+      for (int i = 0; i < entry.getCount(); i++) {
+        runnableContainerRequests.add(createRunnableContainerRequest(entry.getElement()));
+      }
+    }
+
+    // For all runnables that needs to re-request for containers, update the expected count timestamp
+    // so that the EventHandler would triggered with the right expiration timestamp.
+    expectedContainers.updateRequestTime(restartRunnables.elementSet());
+  }
+
+  /**
+   * Check for containers provision timeout and invoke eventHandler if necessary.
+   *
+   * @return the timestamp for the next time this method needs to be called.
+   */
+  private long checkProvisionTimeout(long nextTimeoutCheck) {
+    if (System.currentTimeMillis() < nextTimeoutCheck) {
+      return nextTimeoutCheck;
+    }
+
+    // Invoke event handler for provision request timeout
+    Map<String, ExpectedContainers.ExpectedCount> expiredRequests = expectedContainers.getAll();
+    Map<String, Integer> runningCounts = runningContainers.countAll();
+
+    List<EventHandler.TimeoutEvent> timeoutEvents = Lists.newArrayList();
+    for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
+      String runnableName = entry.getKey();
+      ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
+      int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
+      if (expectedCount.getCount() != runningCount) {
+        timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(),
+                                                                   runningCount, expectedCount.getTimestamp()));
+      }
+    }
+
+    if (!timeoutEvents.isEmpty()) {
+      try {
+        EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents);
+        if (action.getTimeout() < 0) {
+          // Abort application
+          stop();
+        } else {
+          return nextTimeoutCheck + action.getTimeout();
+        }
+      } catch (Throwable t) {
+        LOG.warn("Exception when calling EventHandler {}. Ignore the result.", t);
+      }
+    }
+    return nextTimeoutCheck + Constants.PROVISION_TIMEOUT;
+  }
+
+  private Credentials createCredentials() {
+    Credentials credentials = new Credentials();
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return credentials;
+    }
+
+    try {
+      credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
+
+      // Remove the AM->RM tokens
+      Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+      while (iter.hasNext()) {
+        Token<?> token = iter.next();
+        if (token.getKind().equals(AMRM_TOKEN_KIND_NAME)) {
+          iter.remove();
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to get current user. No credentials will be provided to containers.", e);
+    }
+
+    return credentials;
+  }
+
+  private Queue<RunnableContainerRequest> initContainerRequests() {
+    // Orderly stores container requests.
+    Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
+    // For each order in the twillSpec, create container request for each runnable.
+    for (TwillSpecification.Order order : twillSpec.getOrders()) {
+      // Group container requests based on resource requirement.
+      ImmutableMultimap.Builder<Resource, RuntimeSpecification> builder = ImmutableMultimap.builder();
+      for (String runnableName : order.getNames()) {
+        RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+        Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+        builder.put(capability, runtimeSpec);
+      }
+      requests.add(new RunnableContainerRequest(order.getType(), builder.build()));
+    }
+    return requests;
+  }
+
+  /**
+   * Adds container requests with the given resource capability for each runtime.
+   */
+  private void addContainerRequests(Resource capability,
+                                    Collection<RuntimeSpecification> runtimeSpecs,
+                                    Queue<ProvisionRequest> provisioning) {
+    for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
+      String name = runtimeSpec.getName();
+      int newContainers = expectedContainers.getExpected(name) - runningContainers.count(name);
+      if (newContainers > 0) {
+        // TODO: Allow user to set priority?
+        LOG.info("Request {} container with capability {}", newContainers, capability);
+        String requestId = amClient.addContainerRequest(capability, newContainers).setPriority(0).apply();
+        provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
+      }
+    }
+  }
+
+  /**
+   * Launches runnables in the provisioned containers.
+   */
+  private void launchRunnable(List<ProcessLauncher<YarnContainerInfo>> launchers,
+                              Queue<ProvisionRequest> provisioning) {
+    for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
+      LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
+      ProvisionRequest provisionRequest = provisioning.peek();
+      if (provisionRequest == null) {
+        continue;
+      }
+
+      String runnableName = provisionRequest.getRuntimeSpec().getName();
+      LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
+
+      int containerCount = expectedContainers.getExpected(runnableName);
+
+      ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(
+        ImmutableMap.<String, String>builder()
+          .put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR))
+          .put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER))
+          .put(EnvKeys.TWILL_APP_RUN_ID, runId.getId())
+          .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
+          .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+          .put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect())
+          .build()
+        , getLocalizeFiles(), credentials
+      );
+
+      TwillContainerLauncher launcher = new TwillContainerLauncher(
+        twillSpec.getRunnables().get(runnableName), launchContext,
+        ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
+        containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
+
+      runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
+
+      // Need to call complete to workaround bug in YARN AMRMClient
+      if (provisionRequest.containerAcquired()) {
+        amClient.completeContainerRequest(provisionRequest.getRequestId());
+      }
+
+      if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
+        LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
+        provisioning.poll();
+      }
+    }
+  }
+
+  private List<LocalFile> getLocalizeFiles() {
+    try {
+      Reader reader = Files.newReader(new File(Constants.Files.LOCALIZE_FILES), Charsets.UTF_8);
+      try {
+        return new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+                                .create().fromJson(reader, new TypeToken<List<LocalFile>>() {}.getType());
+      } finally {
+        reader.close();
+      }
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private String getZKNamespace(String runnableName) {
+    return String.format("/%s/runnables/%s", runId.getId(), runnableName);
+  }
+
+  private String getKafkaZKConnect() {
+    return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
+  }
+
+  private Properties generateKafkaConfig() {
+    int port = Networks.getRandomPort();
+    Preconditions.checkState(port > 0, "Failed to get random port.");
+
+    Properties prop = new Properties();
+    prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
+    prop.setProperty("zk.connect", getKafkaZKConnect());
+    prop.setProperty("num.threads", "8");
+    prop.setProperty("port", Integer.toString(port));
+    prop.setProperty("log.flush.interval", "10000");
+    prop.setProperty("max.socket.request.bytes", "104857600");
+    prop.setProperty("log.cleanup.interval.mins", "1");
+    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+    prop.setProperty("zk.connectiontimeout.ms", "1000000");
+    prop.setProperty("socket.receive.buffer", "1048576");
+    prop.setProperty("enable.zookeeper", "true");
+    prop.setProperty("log.retention.hours", "24");
+    prop.setProperty("brokerid", "0");
+    prop.setProperty("socket.send.buffer", "1048576");
+    prop.setProperty("num.partitions", "1");
+    prop.setProperty("log.file.size", "536870912");
+    prop.setProperty("log.default.flush.interval.ms", "1000");
+    return prop;
+  }
+
+  private ListenableFuture<String> processMessage(final String messageId, Message message) {
+    LOG.debug("Message received: {} {}.", messageId, message);
+
+    SettableFuture<String> result = SettableFuture.create();
+    Runnable completion = getMessageCompletion(messageId, result);
+
+    if (handleSecureStoreUpdate(message)) {
+      runningContainers.sendToAll(message, completion);
+      return result;
+    }
+
+    if (handleSetInstances(message, completion)) {
+      return result;
+    }
+
+    // Replicate messages to all runnables
+    if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
+      runningContainers.sendToAll(message, completion);
+      return result;
+    }
+
+    // Replicate message to a particular runnable.
+    if (message.getScope() == Message.Scope.RUNNABLE) {
+      runningContainers.sendToRunnable(message.getRunnableName(), message, completion);
+      return result;
+    }
+
+    LOG.info("Message ignored. {}", message);
+    return Futures.immediateFuture(messageId);
+  }
+
+  /**
+   * Attempts to change the number of running instances.
+   * @return {@code true} if the message does requests for changes in number of running instances of a runnable,
+   *         {@code false} otherwise.
+   */
+  private boolean handleSetInstances(final Message message, final Runnable completion) {
+    if (message.getType() != Message.Type.SYSTEM || message.getScope() != Message.Scope.RUNNABLE) {
+      return false;
+    }
+
+    Command command = message.getCommand();
+    Map<String, String> options = command.getOptions();
+    if (!"instances".equals(command.getCommand()) || !options.containsKey("count")) {
+      return false;
+    }
+
+    final String runnableName = message.getRunnableName();
+    if (runnableName == null || runnableName.isEmpty() || !twillSpec.getRunnables().containsKey(runnableName)) {
+      LOG.info("Unknown runnable {}", runnableName);
+      return false;
+    }
+
+    final int newCount = Integer.parseInt(options.get("count"));
+    final int oldCount = expectedContainers.getExpected(runnableName);
+
+    LOG.info("Received change instances request for {}, from {} to {}.", runnableName, oldCount, newCount);
+
+    if (newCount == oldCount) {   // Nothing to do, simply complete the request.
+      completion.run();
+      return true;
+    }
+
+    instanceChangeExecutor.execute(createSetInstanceRunnable(message, completion, oldCount, newCount));
+    return true;
+  }
+
+  /**
+   * Creates a Runnable for execution of change instance request.
+   */
+  private Runnable createSetInstanceRunnable(final Message message, final Runnable completion,
+                                             final int oldCount, final int newCount) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        final String runnableName = message.getRunnableName();
+
+        LOG.info("Processing change instance request for {}, from {} to {}.", runnableName, oldCount, newCount);
+        try {
+          // Wait until running container count is the same as old count
+          runningContainers.waitForCount(runnableName, oldCount);
+          LOG.info("Confirmed {} containers running for {}.", oldCount, runnableName);
+
+          expectedContainers.setExpected(runnableName, newCount);
+
+          try {
+            if (newCount < oldCount) {
+              // Shutdown some running containers
+              for (int i = 0; i < oldCount - newCount; i++) {
+                runningContainers.removeLast(runnableName);
+              }
+            } else {
+              // Increase the number of instances
+              runnableContainerRequests.add(createRunnableContainerRequest(runnableName));
+            }
+          } finally {
+            runningContainers.sendToRunnable(runnableName, message, completion);
+            LOG.info("Change instances request completed. From {} to {}.", oldCount, newCount);
+          }
+        } catch (InterruptedException e) {
+          // If the wait is being interrupted, discard the message.
+          completion.run();
+        }
+      }
+    };
+  }
+
+  private RunnableContainerRequest createRunnableContainerRequest(final String runnableName) {
+    // Find the current order of the given runnable in order to create a RunnableContainerRequest.
+    TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
+      @Override
+      public boolean apply(TwillSpecification.Order input) {
+        return (input.getNames().contains(runnableName));
+      }
+    });
+
+    RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+    Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+    return new RunnableContainerRequest(order.getType(), ImmutableMultimap.of(capability, runtimeSpec));
+  }
+
+  private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        future.set(messageId);
+      }
+    };
+  }
+
+  private Resource createCapability(ResourceSpecification resourceSpec) {
+    Resource capability = Records.newRecord(Resource.class);
+
+    if (!YarnUtils.setVirtualCores(capability, resourceSpec.getVirtualCores())) {
+      LOG.debug("Virtual cores limit not supported.");
+    }
+
+    capability.setMemory(resourceSpec.getMemorySize());
+    return capability;
+  }
+
+  @Override
+  protected Service getServiceDelegate() {
+    return serviceDelegate;
+  }
+
+  /**
+   * A private class for service lifecycle. It's done this way so that we can have {@link ZKServiceDecorator} to
+   * wrap around this to reflect status in ZK.
+   */
+  private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
+
+    private volatile Thread runThread;
+
+    @Override
+    protected void run() throws Exception {
+      runThread = Thread.currentThread();
+      try {
+        doRun();
+      } catch (InterruptedException e) {
+        // It's ok to get interrupted exception, as it's a signal to stop
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      doStart();
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      doStop();
+    }
+
+    @Override
+    protected void triggerShutdown() {
+      Thread runThread = this.runThread;
+      if (runThread != null) {
+        runThread.interrupt();
+      }
+    }
+
+    @Override
+    public ListenableFuture<String> onReceived(String messageId, Message message) {
+      return processMessage(messageId, message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
new file mode 100644
index 0000000..931c5ef
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * Interface for submitting a new application to run.
+ */
+public interface ApplicationSubmitter {
+
+  ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
new file mode 100644
index 0000000..1769910
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.EventHandlerContext;
+import org.apache.twill.api.EventHandlerSpecification;
+
+/**
+ *
+ */
+final class BasicEventHandlerContext implements EventHandlerContext {
+
+  private final EventHandlerSpecification specification;
+
+  BasicEventHandlerContext(EventHandlerSpecification specification) {
+    this.specification = specification;
+  }
+
+  @Override
+  public EventHandlerSpecification getSpecification() {
+    return specification;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
new file mode 100644
index 0000000..f4ebbd0
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * This class hold information about the expected container count for each runnable. It also
+ * keep track of the timestamp where the expected count has been updated.
+ */
+final class ExpectedContainers {
+
+  private final Map<String, ExpectedCount> expectedCounts;
+
+  ExpectedContainers(Map<String, Integer> expected) {
+    expectedCounts = Maps.newHashMap();
+    long now = System.currentTimeMillis();
+
+    for (Map.Entry<String, Integer> entry : expected.entrySet()) {
+      expectedCounts.put(entry.getKey(), new ExpectedCount(entry.getValue(), now));
+    }
+  }
+
+  synchronized void setExpected(String runnable, int expected) {
+    expectedCounts.put(runnable, new ExpectedCount(expected, System.currentTimeMillis()));
+  }
+
+  /**
+   * Updates the ExpectCount timestamp to current time.
+   * @param runnables List of runnable names.
+   */
+  synchronized void updateRequestTime(Iterable<String> runnables) {
+    for (String runnable : runnables) {
+      ExpectedCount oldCount = expectedCounts.get(runnable);
+      expectedCounts.put(runnable, new ExpectedCount(oldCount.getCount(), System.currentTimeMillis()));
+    }
+  }
+
+  synchronized int getExpected(String runnable) {
+    return expectedCounts.get(runnable).getCount();
+  }
+
+  synchronized Map<String, ExpectedCount> getAll() {
+    return ImmutableMap.copyOf(expectedCounts);
+  }
+
+  static final class ExpectedCount {
+    private final int count;
+    private final long timestamp;
+
+    private ExpectedCount(int count, long timestamp) {
+      this.count = count;
+      this.timestamp = timestamp;
+    }
+
+    int getCount() {
+      return count;
+    }
+
+    long getTimestamp() {
+      return timestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
new file mode 100644
index 0000000..2d41aa6
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.LoggerContextListener;
+
+/**
+ *
+ */
+abstract class LoggerContextListenerAdapter implements LoggerContextListener {
+
+  private final boolean resetResistant;
+
+  protected LoggerContextListenerAdapter(boolean resetResistant) {
+    this.resetResistant = resetResistant;
+  }
+
+  @Override
+  public final boolean isResetResistant() {
+    return resetResistant;
+  }
+
+  @Override
+  public void onStart(LoggerContext context) {
+  }
+
+  @Override
+  public void onReset(LoggerContext context) {
+  }
+
+  @Override
+  public void onStop(LoggerContext context) {
+  }
+
+  @Override
+  public void onLevelChange(Logger logger, Level level) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
new file mode 100644
index 0000000..002d2a5
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.RuntimeSpecification;
+
+/**
+ * Package private class to help AM to track in progress container request.
+ */
+final class ProvisionRequest {
+  private final RuntimeSpecification runtimeSpec;
+  private final String requestId;
+  private int requestCount;
+
+  ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount) {
+    this.runtimeSpec = runtimeSpec;
+    this.requestId = requestId;
+    this.requestCount = requestCount;
+  }
+
+  RuntimeSpecification getRuntimeSpec() {
+    return runtimeSpec;
+  }
+
+  String getRequestId() {
+    return requestId;
+  }
+
+  /**
+   * Called to notify a container has been provision for this request.
+   * @return {@code true} if the requested container count has been provisioned.
+   */
+  boolean containerAcquired() {
+    requestCount--;
+    return requestCount == 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
new file mode 100644
index 0000000..7f28443
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillSpecification;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Data structure for holding set of runnable specifications based on resource capability.
+ */
+final class RunnableContainerRequest {
+  private final TwillSpecification.Order.Type orderType;
+  private final Iterator<Map.Entry<Resource, Collection<RuntimeSpecification>>> requests;
+
+  RunnableContainerRequest(TwillSpecification.Order.Type orderType,
+                           Multimap<Resource, RuntimeSpecification> requests) {
+    this.orderType = orderType;
+    this.requests = requests.asMap().entrySet().iterator();
+  }
+
+  TwillSpecification.Order.Type getOrderType() {
+    return orderType;
+  }
+
+  /**
+   * Remove a resource request and return it.
+   * @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
+   *         {@code null} if there is no more request.
+   */
+  Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> takeRequest() {
+    Map.Entry<Resource, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
+    return next == null ? null : Maps.immutableEntry(next.getKey(), ImmutableList.copyOf(next.getValue()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
new file mode 100644
index 0000000..b4b27a9
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
+import org.apache.twill.internal.yarn.YarnContainerInfo;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.twill.internal.yarn.YarnNMClient;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<YarnContainerInfo> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RunnableProcessLauncher.class);
+
+  private final YarnContainerInfo containerInfo;
+  private final YarnNMClient nmClient;
+  private boolean launched;
+
+  public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmClient) {
+    super(containerInfo);
+    this.containerInfo = containerInfo;
+    this.nmClient = nmClient;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("container", containerInfo)
+      .toString();
+  }
+
+  @Override
+  protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
+    Map<String, String> env = Maps.newHashMap(launchContext.getEnvironment());
+
+    // Set extra environments
+    env.put(EnvKeys.YARN_CONTAINER_ID, containerInfo.getId());
+    env.put(EnvKeys.YARN_CONTAINER_HOST, containerInfo.getHost().getHostName());
+    env.put(EnvKeys.YARN_CONTAINER_PORT, Integer.toString(containerInfo.getPort()));
+    env.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(containerInfo.getMemoryMB()));
+    env.put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(containerInfo.getVirtualCores()));
+
+    launchContext.setEnvironment(env);
+
+    LOG.info("Launching in container {}, {}", containerInfo.getId(), launchContext.getCommands());
+    final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
+    launched = true;
+
+    return new ProcessController<R>() {
+      @Override
+      public R getReport() {
+        // No reporting support for runnable launch yet.
+        return null;
+
+      }
+
+      @Override
+      public void cancel() {
+        cancellable.cancel();
+      }
+    };
+  }
+
+  public boolean isLaunched() {
+    return launched;
+  }
+}