You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:28 UTC

[12/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
new file mode 100644
index 0000000..b4b27a9
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
+import org.apache.twill.internal.yarn.YarnContainerInfo;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.twill.internal.yarn.YarnNMClient;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<YarnContainerInfo> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RunnableProcessLauncher.class);
+
+  private final YarnContainerInfo containerInfo;
+  private final YarnNMClient nmClient;
+  private boolean launched;
+
+  public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmClient) {
+    super(containerInfo);
+    this.containerInfo = containerInfo;
+    this.nmClient = nmClient;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("container", containerInfo)
+      .toString();
+  }
+
+  @Override
+  protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
+    Map<String, String> env = Maps.newHashMap(launchContext.getEnvironment());
+
+    // Set extra environments
+    env.put(EnvKeys.YARN_CONTAINER_ID, containerInfo.getId());
+    env.put(EnvKeys.YARN_CONTAINER_HOST, containerInfo.getHost().getHostName());
+    env.put(EnvKeys.YARN_CONTAINER_PORT, Integer.toString(containerInfo.getPort()));
+    env.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(containerInfo.getMemoryMB()));
+    env.put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(containerInfo.getVirtualCores()));
+
+    launchContext.setEnvironment(env);
+
+    LOG.info("Launching in container {}, {}", containerInfo.getId(), launchContext.getCommands());
+    final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
+    launched = true;
+
+    return new ProcessController<R>() {
+      @Override
+      public R getReport() {
+        // No reporting support for runnable launch yet.
+        return null;
+
+      }
+
+      @Override
+      public void cancel() {
+        cancellable.cancel();
+      }
+    };
+  }
+
+  public boolean isLaunched() {
+    return launched;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
new file mode 100644
index 0000000..beef0d4
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.DefaultResourceReport;
+import org.apache.twill.internal.DefaultTwillRunResources;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.TwillContainerController;
+import org.apache.twill.internal.TwillContainerLauncher;
+import org.apache.twill.internal.container.TwillContainerMain;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.yarn.YarnContainerStatus;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Table;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A helper class for ApplicationMasterService to keep track of running containers and to interact
+ * with them.
+ */
+final class RunningContainers {
+  private static final Logger LOG = LoggerFactory.getLogger(RunningContainers.class);
+
+  /**
+   * Function to return cardinality of a given BitSet.
+   */
+  private static final Function<BitSet, Integer> BITSET_CARDINALITY = new Function<BitSet, Integer>() {
+    @Override
+    public Integer apply(BitSet input) {
+      return input.cardinality();
+    }
+  };
+
+  // Table of <runnableName, containerId, controller>
+  private final Table<String, String, TwillContainerController> containers;
+
+  // Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
+  private final Map<String, BitSet> runnableInstances;
+  private final DefaultResourceReport resourceReport;
+  private final Deque<String> startSequence;
+  private final Lock containerLock;
+  private final Condition containerChange;
+
+  RunningContainers(String appId, TwillRunResources appMasterResources) {
+    containers = HashBasedTable.create();
+    runnableInstances = Maps.newHashMap();
+    startSequence = Lists.newLinkedList();
+    containerLock = new ReentrantLock();
+    containerChange = containerLock.newCondition();
+    resourceReport = new DefaultResourceReport(appId, appMasterResources);
+  }
+
+  /**
+   * Returns {@code true} if there is no live container.
+   */
+  boolean isEmpty() {
+    containerLock.lock();
+    try {
+      return runnableInstances.isEmpty();
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
+    containerLock.lock();
+    try {
+      int instanceId = getStartInstanceId(runnableName);
+      RunId runId = getRunId(runnableName, instanceId);
+      TwillContainerController controller = launcher.start(runId, instanceId,
+                                                           TwillContainerMain.class, "$HADOOP_CONF_DIR");
+      containers.put(runnableName, containerInfo.getId(), controller);
+
+      TwillRunResources resources = new DefaultTwillRunResources(instanceId,
+                                                                 containerInfo.getId(),
+                                                                 containerInfo.getVirtualCores(),
+                                                                 containerInfo.getMemoryMB(),
+                                                                 containerInfo.getHost().getHostName());
+      resourceReport.addRunResources(runnableName, resources);
+
+      if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
+        startSequence.addLast(runnableName);
+      }
+      containerChange.signalAll();
+
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  ResourceReport getResourceReport() {
+    return resourceReport;
+  }
+
+  /**
+   * Stops and removes the last running container of the given runnable.
+   */
+  void removeLast(String runnableName) {
+    containerLock.lock();
+    try {
+      int maxInstanceId = getMaxInstanceId(runnableName);
+      if (maxInstanceId < 0) {
+        LOG.warn("No running container found for {}", runnableName);
+        return;
+      }
+
+      String lastContainerId = null;
+      TwillContainerController lastController = null;
+
+      // Find the controller with the maxInstanceId
+      for (Map.Entry<String, TwillContainerController> entry : containers.row(runnableName).entrySet()) {
+        if (getInstanceId(entry.getValue().getRunId()) == maxInstanceId) {
+          lastContainerId = entry.getKey();
+          lastController = entry.getValue();
+          break;
+        }
+      }
+
+      Preconditions.checkState(lastContainerId != null,
+                               "No container found for {} with instanceId = {}", runnableName, maxInstanceId);
+
+      LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
+      lastController.stopAndWait();
+      containers.remove(runnableName, lastContainerId);
+      removeInstanceId(runnableName, maxInstanceId);
+      resourceReport.removeRunnableResources(runnableName, lastContainerId);
+      containerChange.signalAll();
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  /**
+   * Blocks until there are changes in running containers.
+   */
+  void waitForCount(String runnableName, int count) throws InterruptedException {
+    containerLock.lock();
+    try {
+      while (getRunningInstances(runnableName) != count) {
+        containerChange.await();
+      }
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the number of running instances of the given runnable.
+   */
+  int count(String runnableName) {
+    containerLock.lock();
+    try {
+      return getRunningInstances(runnableName);
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  /**
+   * Returns a Map contains running instances of all runnables.
+   */
+  Map<String, Integer> countAll() {
+    containerLock.lock();
+    try {
+      return ImmutableMap.copyOf(Maps.transformValues(runnableInstances, BITSET_CARDINALITY));
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  void sendToAll(Message message, Runnable completion) {
+    containerLock.lock();
+    try {
+      if (containers.isEmpty()) {
+        completion.run();
+      }
+
+      // Sends the command to all running containers
+      AtomicInteger count = new AtomicInteger(containers.size());
+      for (Map.Entry<String, Map<String, TwillContainerController>> entry : containers.rowMap().entrySet()) {
+        for (TwillContainerController controller : entry.getValue().values()) {
+          sendMessage(entry.getKey(), message, controller, count, completion);
+        }
+      }
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  void sendToRunnable(String runnableName, Message message, Runnable completion) {
+    containerLock.lock();
+    try {
+      Collection<TwillContainerController> controllers = containers.row(runnableName).values();
+      if (controllers.isEmpty()) {
+        completion.run();
+      }
+
+      AtomicInteger count = new AtomicInteger(controllers.size());
+      for (TwillContainerController controller : controllers) {
+        sendMessage(runnableName, message, controller, count, completion);
+      }
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  /**
+   * Stops all running services. Only called when the AppMaster stops.
+   */
+  void stopAll() {
+    containerLock.lock();
+    try {
+      // Stop it one by one in reverse order of start sequence
+      Iterator<String> itor = startSequence.descendingIterator();
+      List<ListenableFuture<ServiceController.State>> futures = Lists.newLinkedList();
+      while (itor.hasNext()) {
+        String runnableName = itor.next();
+        LOG.info("Stopping all instances of " + runnableName);
+
+        futures.clear();
+        // Parallel stops all running containers of the current runnable.
+        for (TwillContainerController controller : containers.row(runnableName).values()) {
+          futures.add(controller.stop());
+        }
+        // Wait for containers to stop. Assumes the future returned by Futures.successfulAsList won't throw exception.
+        Futures.getUnchecked(Futures.successfulAsList(futures));
+
+        LOG.info("Terminated all instances of " + runnableName);
+      }
+      containers.clear();
+      runnableInstances.clear();
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  Set<String> getContainerIds() {
+    containerLock.lock();
+    try {
+      return ImmutableSet.copyOf(containers.columnKeySet());
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  /**
+   * Handle completion of container.
+   * @param status The completion status.
+   * @param restartRunnables Set of runnable names that requires restart.
+   */
+  void handleCompleted(YarnContainerStatus status, Multiset<String> restartRunnables) {
+    containerLock.lock();
+    String containerId = status.getContainerId();
+    int exitStatus = status.getExitStatus();
+    ContainerState state = status.getState();
+
+    try {
+      Map<String, TwillContainerController> lookup = containers.column(containerId);
+      if (lookup.isEmpty()) {
+        // It's OK because if a container is stopped through removeLast, this would be empty.
+        return;
+      }
+
+      if (lookup.size() != 1) {
+        LOG.warn("More than one controller found for container {}", containerId);
+      }
+
+      if (exitStatus != 0) {
+        LOG.warn("Container {} exited abnormally with state {}, exit code {}. Re-request the container.",
+                 containerId, state, exitStatus);
+        restartRunnables.add(lookup.keySet().iterator().next());
+      } else {
+        LOG.info("Container {} exited normally with state {}", containerId, state);
+      }
+
+      for (Map.Entry<String, TwillContainerController> completedEntry : lookup.entrySet()) {
+        String runnableName = completedEntry.getKey();
+        TwillContainerController controller = completedEntry.getValue();
+        controller.completed(exitStatus);
+
+        removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
+        resourceReport.removeRunnableResources(runnableName, containerId);
+      }
+
+      lookup.clear();
+      containerChange.signalAll();
+    } finally {
+      containerLock.unlock();
+    }
+  }
+
+  /**
+   * Sends a command through the given {@link org.apache.twill.internal.TwillContainerController} of a runnable. Decrements the count
+   * when the sending of command completed. Triggers completion when count reaches zero.
+   */
+  private void sendMessage(final String runnableName, final Message message,
+                           final TwillContainerController controller, final AtomicInteger count,
+                           final Runnable completion) {
+    Futures.addCallback(controller.sendMessage(message), new FutureCallback<Message>() {
+      @Override
+      public void onSuccess(Message result) {
+        if (count.decrementAndGet() == 0) {
+          completion.run();
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        try {
+          LOG.error("Failed to send message. Runnable: {}, RunId: {}, Message: {}.",
+                    runnableName, controller.getRunId(), message, t);
+        } finally {
+          if (count.decrementAndGet() == 0) {
+            completion.run();
+          }
+        }
+      }
+    });
+  }
+
+  /**
+   * Returns the instanceId to start the given runnable.
+   */
+  private int getStartInstanceId(String runnableName) {
+    BitSet instances = runnableInstances.get(runnableName);
+    if (instances == null) {
+      instances = new BitSet();
+      runnableInstances.put(runnableName, instances);
+    }
+    int instanceId = instances.nextClearBit(0);
+    instances.set(instanceId);
+    return instanceId;
+  }
+
+  private void removeInstanceId(String runnableName, int instanceId) {
+    BitSet instances = runnableInstances.get(runnableName);
+    if (instances == null) {
+      return;
+    }
+    instances.clear(instanceId);
+    if (instances.isEmpty()) {
+      runnableInstances.remove(runnableName);
+    }
+  }
+
+  /**
+   * Returns the largest instanceId for the given runnable. Returns -1 if no container is running.
+   */
+  private int getMaxInstanceId(String runnableName) {
+    BitSet instances = runnableInstances.get(runnableName);
+    if (instances == null || instances.isEmpty()) {
+      return -1;
+    }
+    return instances.length() - 1;
+  }
+
+  /**
+   * Returns nnumber of running instances for the given runnable.
+   */
+  private int getRunningInstances(String runableName) {
+    BitSet instances = runnableInstances.get(runableName);
+    return instances == null ? 0 : instances.cardinality();
+  }
+
+  private RunId getRunId(String runnableName, int instanceId) {
+    RunId baseId;
+
+    Collection<TwillContainerController> controllers = containers.row(runnableName).values();
+    if (controllers.isEmpty()) {
+      baseId = RunIds.generate();
+    } else {
+      String id = controllers.iterator().next().getRunId().getId();
+      baseId = RunIds.fromString(id.substring(0, id.lastIndexOf('-')));
+    }
+
+    return RunIds.fromString(baseId.getId() + '-' + instanceId);
+  }
+
+  private int getInstanceId(RunId runId) {
+    String id = runId.getId();
+    return Integer.parseInt(id.substring(id.lastIndexOf('-') + 1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
new file mode 100644
index 0000000..ca299e0
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.internal.json.ResourceReportAdapter;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.util.CharsetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Webservice that the Application Master will register back to the resource manager
+ * for clients to track application progress.  Currently used purely for getting a
+ * breakdown of resource usage as a {@link org.apache.twill.api.ResourceReport}.
+ */
+public final class TrackerService extends AbstractIdleService {
+
+  // TODO: This is temporary. When support more REST API, this would get moved.
+  public static final String PATH = "/resources";
+
+  private static final Logger LOG  = LoggerFactory.getLogger(TrackerService.class);
+  private static final int NUM_BOSS_THREADS = 1;
+  private static final int CLOSE_CHANNEL_TIMEOUT = 5;
+  private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
+
+  private final String host;
+  private ServerBootstrap bootstrap;
+  private InetSocketAddress bindAddress;
+  private URL url;
+  private final ChannelGroup channelGroup;
+  private final ResourceReport resourceReport;
+
+  /**
+   * Initialize the service.
+   *
+   * @param resourceReport live report that the service will return to clients.
+   * @param appMasterHost the application master host.
+   */
+  public TrackerService(ResourceReport resourceReport, String appMasterHost) {
+    this.channelGroup = new DefaultChannelGroup("appMasterTracker");
+    this.resourceReport = resourceReport;
+    this.host = appMasterHost;
+  }
+
+  /**
+   * Returns the address this tracker service is bounded to.
+   */
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  /**
+   * @return tracker url.
+   */
+  public URL getUrl() {
+    return url;
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
+                                                        new ThreadFactoryBuilder()
+                                                          .setDaemon(true)
+                                                          .setNameFormat("boss-thread")
+                                                          .build());
+
+    Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+                                                             .setDaemon(true)
+                                                             .setNameFormat("worker-thread#%d")
+                                                             .build());
+
+    ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads);
+
+    bootstrap = new ServerBootstrap(factory);
+
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      public ChannelPipeline getPipeline() {
+        ChannelPipeline pipeline = Channels.pipeline();
+
+        pipeline.addLast("decoder", new HttpRequestDecoder());
+        pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
+        pipeline.addLast("encoder", new HttpResponseEncoder());
+        pipeline.addLast("compressor", new HttpContentCompressor());
+        pipeline.addLast("handler", new ReportHandler(resourceReport));
+
+        return pipeline;
+      }
+    });
+
+    Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
+    bindAddress = (InetSocketAddress) channel.getLocalAddress();
+    url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort()))
+             .resolve(TrackerService.PATH).toURL();
+    channelGroup.add(channel);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    try {
+      if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) {
+        LOG.warn("Timeout when closing all channels.");
+      }
+    } finally {
+      bootstrap.releaseExternalResources();
+    }
+  }
+
+  /**
+   * Handler to return resources used by this application master, which will be available through
+   * the host and port set when this application master registered itself to the resource manager.
+   */
+  public class ReportHandler extends SimpleChannelUpstreamHandler {
+    private final ResourceReport report;
+    private final ResourceReportAdapter reportAdapter;
+
+    public ReportHandler(ResourceReport report) {
+      this.report = report;
+      this.reportAdapter = ResourceReportAdapter.create();
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+      HttpRequest request = (HttpRequest) e.getMessage();
+      if (!isValid(request)) {
+        write404(e);
+        return;
+      }
+
+      writeResponse(e);
+    }
+
+    // only accepts GET on /resources for now
+    private boolean isValid(HttpRequest request) {
+      return (request.getMethod() == HttpMethod.GET) && PATH.equals(request.getUri());
+    }
+
+    private void write404(MessageEvent e) {
+      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
+      ChannelFuture future = e.getChannel().write(response);
+      future.addListener(ChannelFutureListener.CLOSE);
+    }
+
+    private void writeResponse(MessageEvent e) {
+      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+
+      ChannelBuffer content = ChannelBuffers.dynamicBuffer();
+      Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
+      reportAdapter.toJson(report, writer);
+      try {
+        writer.close();
+      } catch (IOException e1) {
+        LOG.error("error writing resource report", e1);
+      }
+      response.setContent(content);
+      ChannelFuture future = e.getChannel().write(response);
+      future.addListener(ChannelFutureListener.CLOSE);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+      e.getChannel().close();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
new file mode 100644
index 0000000..bf8e677
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains implementation of Twill application master.
+ */
+package org.apache.twill.internal.appmaster;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
new file mode 100644
index 0000000..bbd6c10
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.Arguments;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.EnvContainerInfo;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.json.ArgumentsCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class TwillContainerMain extends ServiceMain {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
+
+  /**
+   * Main method for launching a {@link TwillContainerService} which runs
+   * a {@link org.apache.twill.api.TwillRunnable}.
+   */
+  public static void main(final String[] args) throws Exception {
+    // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
+    loadSecureStore();
+
+    String zkConnectStr = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
+    File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
+    RunId appRunId = RunIds.fromString(System.getenv(EnvKeys.TWILL_APP_RUN_ID));
+    RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
+    String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+    int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
+    int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
+
+    ZKClientService zkClientService = ZKClientServices.delegate(
+      ZKClients.reWatchOnExpire(
+        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+                                 RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
+
+    TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
+    renameLocalFiles(twillSpec.getRunnables().get(runnableName));
+    
+    TwillRunnableSpecification runnableSpec = twillSpec.getRunnables().get(runnableName).getRunnableSpecification();
+    ContainerInfo containerInfo = new EnvContainerInfo();
+    Arguments arguments = decodeArgs();
+    BasicTwillContext context = new BasicTwillContext(
+      runId, appRunId, containerInfo.getHost(),
+      arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
+      arguments.getArguments().toArray(new String[0]),
+      runnableSpec, instanceId, discoveryService, instanceCount,
+      containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
+    );
+
+    Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
+    Service service = new TwillContainerService(context, containerInfo,
+                                                getContainerZKClient(zkClientService, appRunId, runnableName),
+                                                runId, runnableSpec, getClassLoader(),
+                                                createAppLocation(conf));
+    new TwillContainerMain().doMain(zkClientService, service);
+  }
+
+  private static void loadSecureStore() throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    File file = new File(Constants.Files.CREDENTIALS);
+    if (file.exists()) {
+      Credentials credentials = new Credentials();
+      DataInputStream input = new DataInputStream(new FileInputStream(file));
+      try {
+        credentials.readTokenStorageStream(input);
+      } finally {
+        input.close();
+      }
+
+      UserGroupInformation.getCurrentUser().addCredentials(credentials);
+      LOG.info("Secure store updated from {}", file);
+    }
+  }
+
+  private static void renameLocalFiles(RuntimeSpecification runtimeSpec) {
+    for (LocalFile file : runtimeSpec.getLocalFiles()) {
+      if (file.isArchive()) {
+        String path = file.getURI().toString();
+        String name = file.getName() + (path.endsWith(".tar.gz") ? ".tar.gz" : path.substring(path.lastIndexOf('.')));
+        Preconditions.checkState(new File(name).renameTo(new File(file.getName())),
+                                 "Fail to rename file from %s to %s.",
+                                 name, file.getName());
+      }
+    }
+  }
+
+  private static ZKClient getContainerZKClient(ZKClient zkClient, RunId appRunId, String runnableName) {
+    return ZKClients.namespace(zkClient, String.format("/%s/runnables/%s", appRunId, runnableName));
+  }
+
+  /**
+   * Returns the ClassLoader for the runnable.
+   */
+  private static ClassLoader getClassLoader() {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      return ClassLoader.getSystemClassLoader();
+    }
+    return classLoader;
+  }
+
+  private static TwillSpecification loadTwillSpec(File specFile) throws IOException {
+    Reader reader = Files.newReader(specFile, Charsets.UTF_8);
+    try {
+      return TwillSpecificationAdapter.create().fromJson(reader);
+    } finally {
+      reader.close();
+    }
+  }
+
+  private static Arguments decodeArgs() throws IOException {
+    return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
+  }
+
+  @Override
+  protected String getHostname() {
+    return System.getenv(EnvKeys.YARN_CONTAINER_HOST);
+  }
+
+  @Override
+  protected String getKafkaZKConnect() {
+    return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
new file mode 100644
index 0000000..f5bc1f2
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.ContainerLiveNodeData;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
+ */
+public final class TwillContainerService extends AbstractTwillService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TwillContainerService.class);
+
+  private final TwillRunnableSpecification specification;
+  private final ClassLoader classLoader;
+  private final ContainerLiveNodeData containerLiveNode;
+  private final BasicTwillContext context;
+  private final ZKServiceDecorator serviceDelegate;
+  private ExecutorService commandExecutor;
+  private TwillRunnable runnable;
+
+  public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
+                               RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
+                               Location applicationLocation) {
+    super(applicationLocation);
+
+    this.specification = specification;
+    this.classLoader = classLoader;
+    this.serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeSupplier(), new ServiceDelegate());
+    this.context = context;
+    this.containerLiveNode = new ContainerLiveNodeData(containerInfo.getId(),
+                                                       containerInfo.getHost().getCanonicalHostName());
+  }
+
+  private ListenableFuture<String> processMessage(final String messageId, final Message message) {
+    LOG.debug("Message received: {} {}.", messageId, message);
+
+    if (handleSecureStoreUpdate(message)) {
+      return Futures.immediateFuture(messageId);
+    }
+
+    final SettableFuture<String> result = SettableFuture.create();
+    Command command = message.getCommand();
+    if (message.getType() == Message.Type.SYSTEM
+          && "instances".equals(command.getCommand()) && command.getOptions().containsKey("count")) {
+      context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
+    }
+
+    commandExecutor.execute(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          runnable.handleCommand(message.getCommand());
+          result.set(messageId);
+        } catch (Exception e) {
+          result.setException(e);
+        }
+      }
+    });
+    return result;
+  }
+
+  private Supplier<? extends JsonElement> createLiveNodeSupplier() {
+    return new Supplier<JsonElement>() {
+      @Override
+      public JsonElement get() {
+        return new Gson().toJsonTree(containerLiveNode);
+      }
+    };
+  }
+
+  @Override
+  protected Service getServiceDelegate() {
+    return serviceDelegate;
+  }
+
+  private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
+
+    @Override
+    protected void startUp() throws Exception {
+      commandExecutor = Executors.newSingleThreadExecutor(
+        Threads.createDaemonThreadFactory("runnable-command-executor"));
+
+      Class<?> runnableClass = classLoader.loadClass(specification.getClassName());
+      Preconditions.checkArgument(TwillRunnable.class.isAssignableFrom(runnableClass),
+                                  "Class %s is not instance of TwillRunnable.", specification.getClassName());
+
+      runnable = Instances.newInstance((Class<TwillRunnable>) runnableClass);
+      runnable.initialize(context);
+    }
+
+    @Override
+    protected void triggerShutdown() {
+      try {
+        runnable.stop();
+      } catch (Throwable t) {
+        LOG.error("Exception when stopping runnable.", t);
+      }
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      commandExecutor.shutdownNow();
+      runnable.destroy();
+      Loggings.forceFlush();
+    }
+
+    @Override
+    protected void run() throws Exception {
+      runnable.run();
+    }
+
+    @Override
+    public ListenableFuture<String> onReceived(String messageId, Message message) {
+      if (state() == State.RUNNING) {
+        // Only process message if the service is still alive
+        return processMessage(messageId, message);
+      }
+      return Futures.immediateFuture(messageId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
new file mode 100644
index 0000000..b810854
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.utils.Paths;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract class to help creating different types of process launcher that process on yarn.
+ *
+ * @param <T> Type of the object that contains information about the container that the process is going to launch.
+ */
+public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
+
+  private final T containerInfo;
+
+  protected AbstractYarnProcessLauncher(T containerInfo) {
+    this.containerInfo = containerInfo;
+  }
+
+  @Override
+  public T getContainerInfo() {
+    return containerInfo;
+  }
+
+  @Override
+  public <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
+                                                Iterable<LocalFile> resources, C credentials) {
+    if (credentials != null) {
+      Preconditions.checkArgument(credentials instanceof Credentials, "Credentials should be of type %s",
+                                  Credentials.class.getName());
+    }
+    return new PrepareLaunchContextImpl(environments, resources, (Credentials) credentials);
+  }
+
+  /**
+   * Tells whether to append suffix to localize resource name for archive file type. Default is true.
+   */
+  protected boolean useArchiveSuffix() {
+    return true;
+  }
+
+  /**
+   * For children class to override to perform actual process launching.
+   */
+  protected abstract <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext);
+
+  /**
+   * Implementation for the {@link PrepareLaunchContext}.
+   */
+  private final class PrepareLaunchContextImpl implements PrepareLaunchContext {
+
+    private final Credentials credentials;
+    private final YarnLaunchContext launchContext;
+    private final Map<String, YarnLocalResource> localResources;
+    private final Map<String, String> environment;
+    private final List<String> commands;
+
+    private PrepareLaunchContextImpl(Map<String, String> env, Iterable<LocalFile> localFiles, Credentials credentials) {
+      this.credentials = credentials;
+      this.launchContext = YarnUtils.createLaunchContext();
+      this.localResources = Maps.newHashMap();
+      this.environment = Maps.newHashMap(env);
+      this.commands = Lists.newLinkedList();
+
+      for (LocalFile localFile : localFiles) {
+        addLocalFile(localFile);
+      }
+    }
+
+    private void addLocalFile(LocalFile localFile) {
+      String name = localFile.getName();
+      // Always append the file extension as the resource name so that archive expansion by Yarn could work.
+      // Renaming would happen by the Container Launcher.
+      if (localFile.isArchive() && useArchiveSuffix()) {
+        String path = localFile.getURI().toString();
+        String suffix = Paths.getExtension(path);
+        if (!suffix.isEmpty()) {
+          name += '.' + suffix;
+        }
+      }
+      localResources.put(name, YarnUtils.createLocalResource(localFile));
+    }
+
+    @Override
+    public ResourcesAdder withResources() {
+      return new MoreResourcesImpl();
+    }
+
+    @Override
+    public AfterResources noResources() {
+      return new MoreResourcesImpl();
+    }
+
+    private final class MoreResourcesImpl implements MoreResources {
+
+      @Override
+      public MoreResources add(LocalFile localFile) {
+        addLocalFile(localFile);
+        return this;
+      }
+
+      @Override
+      public EnvironmentAdder withEnvironment() {
+        return finish();
+      }
+
+      @Override
+      public AfterEnvironment noEnvironment() {
+        return finish();
+      }
+
+      private MoreEnvironmentImpl finish() {
+        launchContext.setLocalResources(localResources);
+        return new MoreEnvironmentImpl();
+      }
+    }
+
+    private final class MoreEnvironmentImpl implements MoreEnvironment {
+
+      @Override
+      public CommandAdder withCommands() {
+        launchContext.setEnvironment(environment);
+        return new MoreCommandImpl();
+      }
+
+      @Override
+      public <V> MoreEnvironment add(String key, V value) {
+        environment.put(key, value.toString());
+        return this;
+      }
+    }
+
+    private final class MoreCommandImpl implements MoreCommand, StdOutSetter, StdErrSetter {
+
+      private final StringBuilder commandBuilder = new StringBuilder();
+
+      @Override
+      public StdOutSetter add(String cmd, String... args) {
+        commandBuilder.append(cmd);
+        for (String arg : args) {
+          commandBuilder.append(' ').append(arg);
+        }
+        return this;
+      }
+
+      @Override
+      public <R> ProcessController<R> launch() {
+        if (credentials != null && !credentials.getAllTokens().isEmpty()) {
+          for (Token<?> token : credentials.getAllTokens()) {
+            LOG.info("Launch with delegation token {}", token);
+          }
+          launchContext.setCredentials(credentials);
+        }
+        launchContext.setCommands(commands);
+        return doLaunch(launchContext);
+      }
+
+      @Override
+      public MoreCommand redirectError(String stderr) {
+        redirect(2, stderr);
+        return noError();
+      }
+
+      @Override
+      public MoreCommand noError() {
+        commands.add(commandBuilder.toString());
+        commandBuilder.setLength(0);
+        return this;
+      }
+
+      @Override
+      public StdErrSetter redirectOutput(String stdout) {
+        redirect(1, stdout);
+        return this;
+      }
+
+      @Override
+      public StdErrSetter noOutput() {
+        return this;
+      }
+
+      private void redirect(int type, String out) {
+        commandBuilder.append(' ')
+                      .append(type).append('>')
+                      .append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
new file mode 100644
index 0000000..6f47b6c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAMClientFactory implements YarnAMClientFactory {
+
+  private final Configuration conf;
+
+  public VersionDetectYarnAMClientFactory(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public YarnAMClient create() {
+    try {
+      Class<YarnAMClient> clz;
+      if (YarnUtils.isHadoop20()) {
+        // Uses hadoop-2.0 class
+        String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
+        clz = (Class<YarnAMClient>) Class.forName(clzName);
+      } else {
+        // Uses hadoop-2.1 class
+        String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
+        clz = (Class<YarnAMClient>) Class.forName(clzName);
+      }
+
+      return clz.getConstructor(Configuration.class).newInstance(conf);
+
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
new file mode 100644
index 0000000..f9db959
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAppClientFactory implements YarnAppClientFactory {
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public YarnAppClient create(Configuration configuration) {
+    try {
+      Class<YarnAppClient> clz;
+
+      if (YarnUtils.isHadoop20()) {
+        // Uses hadoop-2.0 class.
+        String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAppClient";
+        clz = (Class<YarnAppClient>) Class.forName(clzName);
+      } else {
+        // Uses hadoop-2.1 class
+        String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
+        clz = (Class<YarnAppClient>) Class.forName(clzName);
+      }
+
+      return clz.getConstructor(Configuration.class).newInstance(configuration);
+
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
new file mode 100644
index 0000000..83ba6a8
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This interface provides abstraction for AM to interacts with YARN to abstract out YARN version specific
+ * code, making multi-version compatibility easier.
+ */
+public interface YarnAMClient extends Service {
+
+  /**
+   * Builder for creating a container request.
+   */
+  abstract class ContainerRequestBuilder {
+
+    protected final Resource capability;
+    protected final int count;
+    protected final Set<String> hosts = Sets.newHashSet();
+    protected final Set<String> racks = Sets.newHashSet();
+    protected final Priority priority = Records.newRecord(Priority.class);
+
+    protected ContainerRequestBuilder(Resource capability, int count) {
+      this.capability = capability;
+      this.count = count;
+    }
+
+    public ContainerRequestBuilder addHosts(String firstHost, String...moreHosts) {
+      return add(hosts, firstHost, moreHosts);
+    }
+
+    public ContainerRequestBuilder addRacks(String firstRack, String...moreRacks) {
+      return add(racks, firstRack, moreRacks);
+    }
+
+    public ContainerRequestBuilder setPriority(int prio) {
+      priority.setPriority(prio);
+      return this;
+    }
+
+    /**
+     * Adds a container request. Returns an unique ID for the request.
+     */
+    public abstract String apply();
+
+    private <T> ContainerRequestBuilder add(Collection<T> collection, T first, T... more) {
+      collection.add(first);
+      Collections.addAll(collection, more);
+      return this;
+    }
+  }
+
+  ContainerId getContainerId();
+
+  String getHost();
+
+  /**
+   * Sets the tracker address and tracker url. This method should be called before calling {@link #start()}.
+   */
+  void setTracker(InetSocketAddress trackerAddr, URL trackerUrl);
+
+  /**
+   * Callback for allocate call.
+   */
+  // TODO: Move AM heartbeat logic into this interface so AM only needs to handle callback.
+  interface AllocateHandler {
+    void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers);
+
+    void completed(List<YarnContainerStatus> completed);
+  }
+
+  void allocate(float progress, AllocateHandler handler) throws Exception;
+
+  ContainerRequestBuilder addContainerRequest(Resource capability);
+
+  ContainerRequestBuilder addContainerRequest(Resource capability, int count);
+
+  /**
+   * Notify a container request is fulfilled.
+   *
+   * Note: This method is needed to workaround a seemingly bug from AMRMClient implementation in YARN that if
+   * a container is requested after a previous container was acquired (with the same capability), multiple containers
+   * will get allocated instead of one.
+   *
+   * @param id The ID returned by {@link YarnAMClient.ContainerRequestBuilder#apply()}.
+   */
+  void completeContainerRequest(String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
new file mode 100644
index 0000000..b2a1194
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+/**
+ *
+ */
+public interface YarnAMClientFactory {
+
+  YarnAMClient create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
new file mode 100644
index 0000000..71a9e68
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * Interface for launching Yarn application from client.
+ */
+public interface YarnAppClient extends Service {
+
+  /**
+   * Creates a {@link ProcessLauncher} for launching the application represented by the given spec.
+   */
+  ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception;
+
+  /**
+   * Creates a {@link ProcessLauncher} for launching application with the given user and spec.
+   *
+   * @deprecated This method will get removed.
+   */
+  @Deprecated
+  ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception;
+
+  ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
new file mode 100644
index 0000000..70cecad
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public interface YarnAppClientFactory {
+
+  YarnAppClient create(Configuration configuration);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
new file mode 100644
index 0000000..4dbb1d1
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ * This interface is for adapting differences in ApplicationReport in different Hadoop version.
+ */
+public interface YarnApplicationReport {
+
+  /**
+   * Get the <code>ApplicationId</code> of the application.
+   * @return <code>ApplicationId</code> of the application
+   */
+  ApplicationId getApplicationId();
+
+  /**
+   * Get the <code>ApplicationAttemptId</code> of the current
+   * attempt of the application
+   * @return <code>ApplicationAttemptId</code> of the attempt
+   */
+  ApplicationAttemptId getCurrentApplicationAttemptId();
+
+  /**
+   * Get the <em>queue</em> to which the application was submitted.
+   * @return <em>queue</em> to which the application was submitted
+   */
+  String getQueue();
+
+  /**
+   * Get the user-defined <em>name</em> of the application.
+   * @return <em>name</em> of the application
+   */
+  String getName();
+
+  /**
+   * Get the <em>host</em> on which the <code>ApplicationMaster</code>
+   * is running.
+   * @return <em>host</em> on which the <code>ApplicationMaster</code>
+   *         is running
+   */
+  String getHost();
+
+  /**
+   * Get the <em>RPC port</em> of the <code>ApplicationMaster</code>.
+   * @return <em>RPC port</em> of the <code>ApplicationMaster</code>
+   */
+  int getRpcPort();
+
+
+  /**
+   * Get the <code>YarnApplicationState</code> of the application.
+   * @return <code>YarnApplicationState</code> of the application
+   */
+  YarnApplicationState getYarnApplicationState();
+
+
+  /**
+   * Get  the <em>diagnositic information</em> of the application in case of
+   * errors.
+   * @return <em>diagnositic information</em> of the application in case
+   *         of errors
+   */
+  String getDiagnostics();
+
+
+  /**
+   * Get the <em>tracking url</em> for the application.
+   * @return <em>tracking url</em> for the application
+   */
+  String getTrackingUrl();
+
+
+  /**
+   * Get the original not-proxied <em>tracking url</em> for the application.
+   * This is intended to only be used by the proxy itself.
+   * @return the original not-proxied <em>tracking url</em> for the application
+   */
+  String getOriginalTrackingUrl();
+
+  /**
+   * Get the <em>start time</em> of the application.
+   * @return <em>start time</em> of the application
+   */
+  long getStartTime();
+
+
+  /**
+   * Get the <em>finish time</em> of the application.
+   * @return <em>finish time</em> of the application
+   */
+  long getFinishTime();
+
+
+  /**
+   * Get the <em>final finish status</em> of the application.
+   * @return <em>final finish status</em> of the application
+   */
+  FinalApplicationStatus getFinalApplicationStatus();
+
+  /**
+   * Retrieve the structure containing the job resources for this application
+   * @return the job resources structure for this application
+   */
+  ApplicationResourceUsageReport getApplicationResourceUsageReport();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
new file mode 100644
index 0000000..e806da7
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ContainerInfo;
+
+/**
+ *
+ */
+public interface YarnContainerInfo extends ContainerInfo {
+
+  <T> T getContainer();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
new file mode 100644
index 0000000..57e712c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+
+/**
+ * This interface is for adapting differences in ContainerStatus between Hadoop 2.0 and 2.1
+ */
+public interface YarnContainerStatus {
+
+  String getContainerId();
+
+  ContainerState getState();
+
+  int getExitStatus();
+
+  String getDiagnostics();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
new file mode 100644
index 0000000..984a1be
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface is for adapting ContainerLaunchContext in different Hadoop version
+ */
+public interface YarnLaunchContext {
+
+  <T> T getLaunchContext();
+
+  void setCredentials(Credentials credentials);
+
+  void setLocalResources(Map<String, YarnLocalResource> localResources);
+
+  void setServiceData(Map<String, ByteBuffer> serviceData);
+
+  Map<String, String> getEnvironment();
+
+  void setEnvironment(Map<String, String> environment);
+
+  List<String> getCommands();
+
+  void setCommands(List<String> commands);
+
+  void setApplicationACLs(Map<ApplicationAccessType, String> acls);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
new file mode 100644
index 0000000..9bfc224
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+/**
+ * A adapter interface for the LocalResource class/interface in different Hadoop version.
+ */
+public interface YarnLocalResource {
+
+  /**
+   * Returns the actual LocalResource object in Yarn.
+   */
+  <T> T getLocalResource();
+
+  /**
+   * Get the <em>location</em> of the resource to be localized.
+   * @return <em>location</em> of the resource to be localized
+   */
+  URL getResource();
+
+  /**
+   * Set <em>location</em> of the resource to be localized.
+   * @param resource <em>location</em> of the resource to be localized
+   */
+  void setResource(URL resource);
+
+  /**
+   * Get the <em>size</em> of the resource to be localized.
+   * @return <em>size</em> of the resource to be localized
+   */
+  long getSize();
+
+  /**
+   * Set the <em>size</em> of the resource to be localized.
+   * @param size <em>size</em> of the resource to be localized
+   */
+  void setSize(long size);
+
+  /**
+   * Get the original <em>timestamp</em> of the resource to be localized, used
+   * for verification.
+   * @return <em>timestamp</em> of the resource to be localized
+   */
+  long getTimestamp();
+
+  /**
+   * Set the <em>timestamp</em> of the resource to be localized, used
+   * for verification.
+   * @param timestamp <em>timestamp</em> of the resource to be localized
+   */
+  void setTimestamp(long timestamp);
+
+  /**
+   * Get the <code>LocalResourceType</code> of the resource to be localized.
+   * @return <code>LocalResourceType</code> of the resource to be localized
+   */
+  LocalResourceType getType();
+
+  /**
+   * Set the <code>LocalResourceType</code> of the resource to be localized.
+   * @param type <code>LocalResourceType</code> of the resource to be localized
+   */
+  void setType(LocalResourceType type);
+
+  /**
+   * Get the <code>LocalResourceVisibility</code> of the resource to be
+   * localized.
+   * @return <code>LocalResourceVisibility</code> of the resource to be
+   *         localized
+   */
+  LocalResourceVisibility getVisibility();
+
+  /**
+   * Set the <code>LocalResourceVisibility</code> of the resource to be
+   * localized.
+   * @param visibility <code>LocalResourceVisibility</code> of the resource to be
+   *                   localized
+   */
+  void setVisibility(LocalResourceVisibility visibility);
+
+  /**
+   * Get the <em>pattern</em> that should be used to extract entries from the
+   * archive (only used when type is <code>PATTERN</code>).
+   * @return <em>pattern</em> that should be used to extract entries from the
+   * archive.
+   */
+  String getPattern();
+
+  /**
+   * Set the <em>pattern</em> that should be used to extract entries from the
+   * archive (only used when type is <code>PATTERN</code>).
+   * @param pattern <em>pattern</em> that should be used to extract entries
+   * from the archive.
+   */
+  void setPattern(String pattern);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
new file mode 100644
index 0000000..d863c91
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * Abstraction for dealing with API differences in different hadoop yarn version
+ */
+public interface YarnNMClient {
+
+  /**
+   * Starts a process based on the given launch context.
+   *
+   * @param containerInfo The containerInfo that the new process will launch in.
+   * @param launchContext Contains information about the process going to start.
+   * @return A {@link Cancellable} that when {@link Cancellable#cancel()}} is invoked,
+   *         it will try to shutdown the process.
+   *
+   */
+  Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext);
+}