You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:28 UTC

[05/15] Initial import commit.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
new file mode 100644
index 0000000..bbd6c10
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.Arguments;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.EnvContainerInfo;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.json.ArgumentsCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class TwillContainerMain extends ServiceMain {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class);
+
+  /**
+   * Main method for launching a {@link TwillContainerService} which runs
+   * a {@link org.apache.twill.api.TwillRunnable}.
+   */
+  public static void main(final String[] args) throws Exception {
+    // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
+    loadSecureStore();
+
+    String zkConnectStr = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
+    File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
+    RunId appRunId = RunIds.fromString(System.getenv(EnvKeys.TWILL_APP_RUN_ID));
+    RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
+    String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+    int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
+    int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
+
+    ZKClientService zkClientService = ZKClientServices.delegate(
+      ZKClients.reWatchOnExpire(
+        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+                                 RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+    DiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
+
+    TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
+    renameLocalFiles(twillSpec.getRunnables().get(runnableName));
+    
+    TwillRunnableSpecification runnableSpec = twillSpec.getRunnables().get(runnableName).getRunnableSpecification();
+    ContainerInfo containerInfo = new EnvContainerInfo();
+    Arguments arguments = decodeArgs();
+    BasicTwillContext context = new BasicTwillContext(
+      runId, appRunId, containerInfo.getHost(),
+      arguments.getRunnableArguments().get(runnableName).toArray(new String[0]),
+      arguments.getArguments().toArray(new String[0]),
+      runnableSpec, instanceId, discoveryService, instanceCount,
+      containerInfo.getMemoryMB(), containerInfo.getVirtualCores()
+    );
+
+    Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
+    Service service = new TwillContainerService(context, containerInfo,
+                                                getContainerZKClient(zkClientService, appRunId, runnableName),
+                                                runId, runnableSpec, getClassLoader(),
+                                                createAppLocation(conf));
+    new TwillContainerMain().doMain(zkClientService, service);
+  }
+
+  private static void loadSecureStore() throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    File file = new File(Constants.Files.CREDENTIALS);
+    if (file.exists()) {
+      Credentials credentials = new Credentials();
+      DataInputStream input = new DataInputStream(new FileInputStream(file));
+      try {
+        credentials.readTokenStorageStream(input);
+      } finally {
+        input.close();
+      }
+
+      UserGroupInformation.getCurrentUser().addCredentials(credentials);
+      LOG.info("Secure store updated from {}", file);
+    }
+  }
+
+  private static void renameLocalFiles(RuntimeSpecification runtimeSpec) {
+    for (LocalFile file : runtimeSpec.getLocalFiles()) {
+      if (file.isArchive()) {
+        String path = file.getURI().toString();
+        String name = file.getName() + (path.endsWith(".tar.gz") ? ".tar.gz" : path.substring(path.lastIndexOf('.')));
+        Preconditions.checkState(new File(name).renameTo(new File(file.getName())),
+                                 "Fail to rename file from %s to %s.",
+                                 name, file.getName());
+      }
+    }
+  }
+
+  private static ZKClient getContainerZKClient(ZKClient zkClient, RunId appRunId, String runnableName) {
+    return ZKClients.namespace(zkClient, String.format("/%s/runnables/%s", appRunId, runnableName));
+  }
+
+  /**
+   * Returns the ClassLoader for the runnable.
+   */
+  private static ClassLoader getClassLoader() {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      return ClassLoader.getSystemClassLoader();
+    }
+    return classLoader;
+  }
+
+  private static TwillSpecification loadTwillSpec(File specFile) throws IOException {
+    Reader reader = Files.newReader(specFile, Charsets.UTF_8);
+    try {
+      return TwillSpecificationAdapter.create().fromJson(reader);
+    } finally {
+      reader.close();
+    }
+  }
+
+  private static Arguments decodeArgs() throws IOException {
+    return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
+  }
+
+  @Override
+  protected String getHostname() {
+    return System.getenv(EnvKeys.YARN_CONTAINER_HOST);
+  }
+
+  @Override
+  protected String getKafkaZKConnect() {
+    return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
new file mode 100644
index 0000000..f5bc1f2
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/container/TwillContainerService.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.container;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.BasicTwillContext;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.ContainerLiveNodeData;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * This class act as a yarn container and run a {@link org.apache.twill.api.TwillRunnable}.
+ */
+public final class TwillContainerService extends AbstractTwillService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TwillContainerService.class);
+
+  private final TwillRunnableSpecification specification;
+  private final ClassLoader classLoader;
+  private final ContainerLiveNodeData containerLiveNode;
+  private final BasicTwillContext context;
+  private final ZKServiceDecorator serviceDelegate;
+  private ExecutorService commandExecutor;
+  private TwillRunnable runnable;
+
+  public TwillContainerService(BasicTwillContext context, ContainerInfo containerInfo, ZKClient zkClient,
+                               RunId runId, TwillRunnableSpecification specification, ClassLoader classLoader,
+                               Location applicationLocation) {
+    super(applicationLocation);
+
+    this.specification = specification;
+    this.classLoader = classLoader;
+    this.serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeSupplier(), new ServiceDelegate());
+    this.context = context;
+    this.containerLiveNode = new ContainerLiveNodeData(containerInfo.getId(),
+                                                       containerInfo.getHost().getCanonicalHostName());
+  }
+
+  private ListenableFuture<String> processMessage(final String messageId, final Message message) {
+    LOG.debug("Message received: {} {}.", messageId, message);
+
+    if (handleSecureStoreUpdate(message)) {
+      return Futures.immediateFuture(messageId);
+    }
+
+    final SettableFuture<String> result = SettableFuture.create();
+    Command command = message.getCommand();
+    if (message.getType() == Message.Type.SYSTEM
+          && "instances".equals(command.getCommand()) && command.getOptions().containsKey("count")) {
+      context.setInstanceCount(Integer.parseInt(command.getOptions().get("count")));
+    }
+
+    commandExecutor.execute(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          runnable.handleCommand(message.getCommand());
+          result.set(messageId);
+        } catch (Exception e) {
+          result.setException(e);
+        }
+      }
+    });
+    return result;
+  }
+
+  private Supplier<? extends JsonElement> createLiveNodeSupplier() {
+    return new Supplier<JsonElement>() {
+      @Override
+      public JsonElement get() {
+        return new Gson().toJsonTree(containerLiveNode);
+      }
+    };
+  }
+
+  @Override
+  protected Service getServiceDelegate() {
+    return serviceDelegate;
+  }
+
+  private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
+
+    @Override
+    protected void startUp() throws Exception {
+      commandExecutor = Executors.newSingleThreadExecutor(
+        Threads.createDaemonThreadFactory("runnable-command-executor"));
+
+      Class<?> runnableClass = classLoader.loadClass(specification.getClassName());
+      Preconditions.checkArgument(TwillRunnable.class.isAssignableFrom(runnableClass),
+                                  "Class %s is not instance of TwillRunnable.", specification.getClassName());
+
+      runnable = Instances.newInstance((Class<TwillRunnable>) runnableClass);
+      runnable.initialize(context);
+    }
+
+    @Override
+    protected void triggerShutdown() {
+      try {
+        runnable.stop();
+      } catch (Throwable t) {
+        LOG.error("Exception when stopping runnable.", t);
+      }
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      commandExecutor.shutdownNow();
+      runnable.destroy();
+      Loggings.forceFlush();
+    }
+
+    @Override
+    protected void run() throws Exception {
+      runnable.run();
+    }
+
+    @Override
+    public ListenableFuture<String> onReceived(String messageId, Message message) {
+      if (state() == State.RUNNING) {
+        // Only process message if the service is still alive
+        return processMessage(messageId, message);
+      }
+      return Futures.immediateFuture(messageId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
new file mode 100644
index 0000000..b810854
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.utils.Paths;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract class to help creating different types of process launcher that process on yarn.
+ *
+ * @param <T> Type of the object that contains information about the container that the process is going to launch.
+ */
+public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
+
+  private final T containerInfo;
+
+  protected AbstractYarnProcessLauncher(T containerInfo) {
+    this.containerInfo = containerInfo;
+  }
+
+  @Override
+  public T getContainerInfo() {
+    return containerInfo;
+  }
+
+  @Override
+  public <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
+                                                Iterable<LocalFile> resources, C credentials) {
+    if (credentials != null) {
+      Preconditions.checkArgument(credentials instanceof Credentials, "Credentials should be of type %s",
+                                  Credentials.class.getName());
+    }
+    return new PrepareLaunchContextImpl(environments, resources, (Credentials) credentials);
+  }
+
+  /**
+   * Tells whether to append suffix to localize resource name for archive file type. Default is true.
+   */
+  protected boolean useArchiveSuffix() {
+    return true;
+  }
+
+  /**
+   * For children class to override to perform actual process launching.
+   */
+  protected abstract <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext);
+
+  /**
+   * Implementation for the {@link PrepareLaunchContext}.
+   */
+  private final class PrepareLaunchContextImpl implements PrepareLaunchContext {
+
+    private final Credentials credentials;
+    private final YarnLaunchContext launchContext;
+    private final Map<String, YarnLocalResource> localResources;
+    private final Map<String, String> environment;
+    private final List<String> commands;
+
+    private PrepareLaunchContextImpl(Map<String, String> env, Iterable<LocalFile> localFiles, Credentials credentials) {
+      this.credentials = credentials;
+      this.launchContext = YarnUtils.createLaunchContext();
+      this.localResources = Maps.newHashMap();
+      this.environment = Maps.newHashMap(env);
+      this.commands = Lists.newLinkedList();
+
+      for (LocalFile localFile : localFiles) {
+        addLocalFile(localFile);
+      }
+    }
+
+    private void addLocalFile(LocalFile localFile) {
+      String name = localFile.getName();
+      // Always append the file extension as the resource name so that archive expansion by Yarn could work.
+      // Renaming would happen by the Container Launcher.
+      if (localFile.isArchive() && useArchiveSuffix()) {
+        String path = localFile.getURI().toString();
+        String suffix = Paths.getExtension(path);
+        if (!suffix.isEmpty()) {
+          name += '.' + suffix;
+        }
+      }
+      localResources.put(name, YarnUtils.createLocalResource(localFile));
+    }
+
+    @Override
+    public ResourcesAdder withResources() {
+      return new MoreResourcesImpl();
+    }
+
+    @Override
+    public AfterResources noResources() {
+      return new MoreResourcesImpl();
+    }
+
+    private final class MoreResourcesImpl implements MoreResources {
+
+      @Override
+      public MoreResources add(LocalFile localFile) {
+        addLocalFile(localFile);
+        return this;
+      }
+
+      @Override
+      public EnvironmentAdder withEnvironment() {
+        return finish();
+      }
+
+      @Override
+      public AfterEnvironment noEnvironment() {
+        return finish();
+      }
+
+      private MoreEnvironmentImpl finish() {
+        launchContext.setLocalResources(localResources);
+        return new MoreEnvironmentImpl();
+      }
+    }
+
+    private final class MoreEnvironmentImpl implements MoreEnvironment {
+
+      @Override
+      public CommandAdder withCommands() {
+        launchContext.setEnvironment(environment);
+        return new MoreCommandImpl();
+      }
+
+      @Override
+      public <V> MoreEnvironment add(String key, V value) {
+        environment.put(key, value.toString());
+        return this;
+      }
+    }
+
+    private final class MoreCommandImpl implements MoreCommand, StdOutSetter, StdErrSetter {
+
+      private final StringBuilder commandBuilder = new StringBuilder();
+
+      @Override
+      public StdOutSetter add(String cmd, String... args) {
+        commandBuilder.append(cmd);
+        for (String arg : args) {
+          commandBuilder.append(' ').append(arg);
+        }
+        return this;
+      }
+
+      @Override
+      public <R> ProcessController<R> launch() {
+        if (credentials != null && !credentials.getAllTokens().isEmpty()) {
+          for (Token<?> token : credentials.getAllTokens()) {
+            LOG.info("Launch with delegation token {}", token);
+          }
+          launchContext.setCredentials(credentials);
+        }
+        launchContext.setCommands(commands);
+        return doLaunch(launchContext);
+      }
+
+      @Override
+      public MoreCommand redirectError(String stderr) {
+        redirect(2, stderr);
+        return noError();
+      }
+
+      @Override
+      public MoreCommand noError() {
+        commands.add(commandBuilder.toString());
+        commandBuilder.setLength(0);
+        return this;
+      }
+
+      @Override
+      public StdErrSetter redirectOutput(String stdout) {
+        redirect(1, stdout);
+        return this;
+      }
+
+      @Override
+      public StdErrSetter noOutput() {
+        return this;
+      }
+
+      private void redirect(int type, String out) {
+        commandBuilder.append(' ')
+                      .append(type).append('>')
+                      .append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append('/').append(out);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
new file mode 100644
index 0000000..6f47b6c
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAMClientFactory implements YarnAMClientFactory {
+
+  private final Configuration conf;
+
+  public VersionDetectYarnAMClientFactory(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public YarnAMClient create() {
+    try {
+      Class<YarnAMClient> clz;
+      if (YarnUtils.isHadoop20()) {
+        // Uses hadoop-2.0 class
+        String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
+        clz = (Class<YarnAMClient>) Class.forName(clzName);
+      } else {
+        // Uses hadoop-2.1 class
+        String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
+        clz = (Class<YarnAMClient>) Class.forName(clzName);
+      }
+
+      return clz.getConstructor(Configuration.class).newInstance(conf);
+
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
new file mode 100644
index 0000000..f9db959
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public final class VersionDetectYarnAppClientFactory implements YarnAppClientFactory {
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public YarnAppClient create(Configuration configuration) {
+    try {
+      Class<YarnAppClient> clz;
+
+      if (YarnUtils.isHadoop20()) {
+        // Uses hadoop-2.0 class.
+        String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAppClient";
+        clz = (Class<YarnAppClient>) Class.forName(clzName);
+      } else {
+        // Uses hadoop-2.1 class
+        String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
+        clz = (Class<YarnAppClient>) Class.forName(clzName);
+      }
+
+      return clz.getConstructor(Configuration.class).newInstance(configuration);
+
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
new file mode 100644
index 0000000..83ba6a8
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This interface provides abstraction for AM to interacts with YARN to abstract out YARN version specific
+ * code, making multi-version compatibility easier.
+ */
+public interface YarnAMClient extends Service {
+
+  /**
+   * Builder for creating a container request.
+   */
+  abstract class ContainerRequestBuilder {
+
+    protected final Resource capability;
+    protected final int count;
+    protected final Set<String> hosts = Sets.newHashSet();
+    protected final Set<String> racks = Sets.newHashSet();
+    protected final Priority priority = Records.newRecord(Priority.class);
+
+    protected ContainerRequestBuilder(Resource capability, int count) {
+      this.capability = capability;
+      this.count = count;
+    }
+
+    public ContainerRequestBuilder addHosts(String firstHost, String...moreHosts) {
+      return add(hosts, firstHost, moreHosts);
+    }
+
+    public ContainerRequestBuilder addRacks(String firstRack, String...moreRacks) {
+      return add(racks, firstRack, moreRacks);
+    }
+
+    public ContainerRequestBuilder setPriority(int prio) {
+      priority.setPriority(prio);
+      return this;
+    }
+
+    /**
+     * Adds a container request. Returns an unique ID for the request.
+     */
+    public abstract String apply();
+
+    private <T> ContainerRequestBuilder add(Collection<T> collection, T first, T... more) {
+      collection.add(first);
+      Collections.addAll(collection, more);
+      return this;
+    }
+  }
+
+  ContainerId getContainerId();
+
+  String getHost();
+
+  /**
+   * Sets the tracker address and tracker url. This method should be called before calling {@link #start()}.
+   */
+  void setTracker(InetSocketAddress trackerAddr, URL trackerUrl);
+
+  /**
+   * Callback for allocate call.
+   */
+  // TODO: Move AM heartbeat logic into this interface so AM only needs to handle callback.
+  interface AllocateHandler {
+    void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers);
+
+    void completed(List<YarnContainerStatus> completed);
+  }
+
+  void allocate(float progress, AllocateHandler handler) throws Exception;
+
+  ContainerRequestBuilder addContainerRequest(Resource capability);
+
+  ContainerRequestBuilder addContainerRequest(Resource capability, int count);
+
+  /**
+   * Notify a container request is fulfilled.
+   *
+   * Note: This method is needed to workaround a seemingly bug from AMRMClient implementation in YARN that if
+   * a container is requested after a previous container was acquired (with the same capability), multiple containers
+   * will get allocated instead of one.
+   *
+   * @param id The ID returned by {@link YarnAMClient.ContainerRequestBuilder#apply()}.
+   */
+  void completeContainerRequest(String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
new file mode 100644
index 0000000..b2a1194
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClientFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+/**
+ *
+ */
+public interface YarnAMClientFactory {
+
+  YarnAMClient create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
new file mode 100644
index 0000000..71a9e68
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * Interface for launching Yarn application from client.
+ */
+public interface YarnAppClient extends Service {
+
+  /**
+   * Creates a {@link ProcessLauncher} for launching the application represented by the given spec.
+   */
+  ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception;
+
+  /**
+   * Creates a {@link ProcessLauncher} for launching application with the given user and spec.
+   *
+   * @deprecated This method will get removed.
+   */
+  @Deprecated
+  ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception;
+
+  ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
new file mode 100644
index 0000000..70cecad
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClientFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public interface YarnAppClientFactory {
+
+  YarnAppClient create(Configuration configuration);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
new file mode 100644
index 0000000..4dbb1d1
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnApplicationReport.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ * This interface is for adapting differences in ApplicationReport in different Hadoop version.
+ */
+public interface YarnApplicationReport {
+
+  /**
+   * Get the <code>ApplicationId</code> of the application.
+   * @return <code>ApplicationId</code> of the application
+   */
+  ApplicationId getApplicationId();
+
+  /**
+   * Get the <code>ApplicationAttemptId</code> of the current
+   * attempt of the application
+   * @return <code>ApplicationAttemptId</code> of the attempt
+   */
+  ApplicationAttemptId getCurrentApplicationAttemptId();
+
+  /**
+   * Get the <em>queue</em> to which the application was submitted.
+   * @return <em>queue</em> to which the application was submitted
+   */
+  String getQueue();
+
+  /**
+   * Get the user-defined <em>name</em> of the application.
+   * @return <em>name</em> of the application
+   */
+  String getName();
+
+  /**
+   * Get the <em>host</em> on which the <code>ApplicationMaster</code>
+   * is running.
+   * @return <em>host</em> on which the <code>ApplicationMaster</code>
+   *         is running
+   */
+  String getHost();
+
+  /**
+   * Get the <em>RPC port</em> of the <code>ApplicationMaster</code>.
+   * @return <em>RPC port</em> of the <code>ApplicationMaster</code>
+   */
+  int getRpcPort();
+
+
+  /**
+   * Get the <code>YarnApplicationState</code> of the application.
+   * @return <code>YarnApplicationState</code> of the application
+   */
+  YarnApplicationState getYarnApplicationState();
+
+
+  /**
+   * Get  the <em>diagnositic information</em> of the application in case of
+   * errors.
+   * @return <em>diagnositic information</em> of the application in case
+   *         of errors
+   */
+  String getDiagnostics();
+
+
+  /**
+   * Get the <em>tracking url</em> for the application.
+   * @return <em>tracking url</em> for the application
+   */
+  String getTrackingUrl();
+
+
+  /**
+   * Get the original not-proxied <em>tracking url</em> for the application.
+   * This is intended to only be used by the proxy itself.
+   * @return the original not-proxied <em>tracking url</em> for the application
+   */
+  String getOriginalTrackingUrl();
+
+  /**
+   * Get the <em>start time</em> of the application.
+   * @return <em>start time</em> of the application
+   */
+  long getStartTime();
+
+
+  /**
+   * Get the <em>finish time</em> of the application.
+   * @return <em>finish time</em> of the application
+   */
+  long getFinishTime();
+
+
+  /**
+   * Get the <em>final finish status</em> of the application.
+   * @return <em>final finish status</em> of the application
+   */
+  FinalApplicationStatus getFinalApplicationStatus();
+
+  /**
+   * Retrieve the structure containing the job resources for this application
+   * @return the job resources structure for this application
+   */
+  ApplicationResourceUsageReport getApplicationResourceUsageReport();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
new file mode 100644
index 0000000..e806da7
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ContainerInfo;
+
+/**
+ *
+ */
+public interface YarnContainerInfo extends ContainerInfo {
+
+  <T> T getContainer();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
new file mode 100644
index 0000000..57e712c
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnContainerStatus.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+
+/**
+ * This interface is for adapting differences in ContainerStatus between Hadoop 2.0 and 2.1
+ */
+public interface YarnContainerStatus {
+
+  String getContainerId();
+
+  ContainerState getState();
+
+  int getExitStatus();
+
+  String getDiagnostics();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
new file mode 100644
index 0000000..984a1be
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLaunchContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This interface is for adapting ContainerLaunchContext in different Hadoop version
+ */
+public interface YarnLaunchContext {
+
+  <T> T getLaunchContext();
+
+  void setCredentials(Credentials credentials);
+
+  void setLocalResources(Map<String, YarnLocalResource> localResources);
+
+  void setServiceData(Map<String, ByteBuffer> serviceData);
+
+  Map<String, String> getEnvironment();
+
+  void setEnvironment(Map<String, String> environment);
+
+  List<String> getCommands();
+
+  void setCommands(List<String> commands);
+
+  void setApplicationACLs(Map<ApplicationAccessType, String> acls);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
new file mode 100644
index 0000000..9bfc224
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnLocalResource.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+
+/**
+ * A adapter interface for the LocalResource class/interface in different Hadoop version.
+ */
+public interface YarnLocalResource {
+
+  /**
+   * Returns the actual LocalResource object in Yarn.
+   */
+  <T> T getLocalResource();
+
+  /**
+   * Get the <em>location</em> of the resource to be localized.
+   * @return <em>location</em> of the resource to be localized
+   */
+  URL getResource();
+
+  /**
+   * Set <em>location</em> of the resource to be localized.
+   * @param resource <em>location</em> of the resource to be localized
+   */
+  void setResource(URL resource);
+
+  /**
+   * Get the <em>size</em> of the resource to be localized.
+   * @return <em>size</em> of the resource to be localized
+   */
+  long getSize();
+
+  /**
+   * Set the <em>size</em> of the resource to be localized.
+   * @param size <em>size</em> of the resource to be localized
+   */
+  void setSize(long size);
+
+  /**
+   * Get the original <em>timestamp</em> of the resource to be localized, used
+   * for verification.
+   * @return <em>timestamp</em> of the resource to be localized
+   */
+  long getTimestamp();
+
+  /**
+   * Set the <em>timestamp</em> of the resource to be localized, used
+   * for verification.
+   * @param timestamp <em>timestamp</em> of the resource to be localized
+   */
+  void setTimestamp(long timestamp);
+
+  /**
+   * Get the <code>LocalResourceType</code> of the resource to be localized.
+   * @return <code>LocalResourceType</code> of the resource to be localized
+   */
+  LocalResourceType getType();
+
+  /**
+   * Set the <code>LocalResourceType</code> of the resource to be localized.
+   * @param type <code>LocalResourceType</code> of the resource to be localized
+   */
+  void setType(LocalResourceType type);
+
+  /**
+   * Get the <code>LocalResourceVisibility</code> of the resource to be
+   * localized.
+   * @return <code>LocalResourceVisibility</code> of the resource to be
+   *         localized
+   */
+  LocalResourceVisibility getVisibility();
+
+  /**
+   * Set the <code>LocalResourceVisibility</code> of the resource to be
+   * localized.
+   * @param visibility <code>LocalResourceVisibility</code> of the resource to be
+   *                   localized
+   */
+  void setVisibility(LocalResourceVisibility visibility);
+
+  /**
+   * Get the <em>pattern</em> that should be used to extract entries from the
+   * archive (only used when type is <code>PATTERN</code>).
+   * @return <em>pattern</em> that should be used to extract entries from the
+   * archive.
+   */
+  String getPattern();
+
+  /**
+   * Set the <em>pattern</em> that should be used to extract entries from the
+   * archive (only used when type is <code>PATTERN</code>).
+   * @param pattern <em>pattern</em> that should be used to extract entries
+   * from the archive.
+   */
+  void setPattern(String pattern);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
new file mode 100644
index 0000000..d863c91
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnNMClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * Abstraction for dealing with API differences in different hadoop yarn version
+ */
+public interface YarnNMClient {
+
+  /**
+   * Starts a process based on the given launch context.
+   *
+   * @param containerInfo The containerInfo that the new process will launch in.
+   * @param launchContext Contains information about the process going to start.
+   * @return A {@link Cancellable} that when {@link Cancellable#cancel()}} is invoked,
+   *         it will try to shutdown the process.
+   *
+   */
+  Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
new file mode 100644
index 0000000..4f7597b
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.filesystem.ForwardingLocationFactory;
+import org.apache.twill.filesystem.HDFSLocationFactory;
+import org.apache.twill.filesystem.LocationFactory;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Collection of helper methods to simplify YARN calls.
+ */
+public class YarnUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
+  private static final AtomicReference<Boolean> HADOOP_20 = new AtomicReference<Boolean>();
+
+  public static YarnLocalResource createLocalResource(LocalFile localFile) {
+    Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last modified time should be >= 0.");
+    Preconditions.checkArgument(localFile.getSize() >= 0, "File size should be >= 0.");
+
+    YarnLocalResource resource = createAdapter(YarnLocalResource.class);
+    resource.setVisibility(LocalResourceVisibility.APPLICATION);
+    resource.setResource(ConverterUtils.getYarnUrlFromURI(localFile.getURI()));
+    resource.setTimestamp(localFile.getLastModified());
+    resource.setSize(localFile.getSize());
+    return setLocalResourceType(resource, localFile);
+  }
+
+  public static YarnLaunchContext createLaunchContext() {
+    return createAdapter(YarnLaunchContext.class);
+  }
+
+  // temporary workaround since older versions of hadoop don't have the getVirtualCores method.
+  public static int getVirtualCores(Resource resource) {
+    try {
+      Method getVirtualCores = Resource.class.getMethod("getVirtualCores");
+      return (Integer) getVirtualCores.invoke(resource);
+    } catch (Exception e) {
+      return 0;
+    }
+  }
+
+  /**
+   * Temporary workaround since older versions of hadoop don't have the setCores method.
+   *
+   * @param resource
+   * @param cores
+   * @return true if virtual cores was set, false if not.
+   */
+  public static boolean setVirtualCores(Resource resource, int cores) {
+    try {
+      Method setVirtualCores = Resource.class.getMethod("setVirtualCores", int.class);
+      setVirtualCores.invoke(resource, cores);
+    } catch (Exception e) {
+      // It's ok to ignore this exception, as it's using older version of API.
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Creates {@link ApplicationId} from the given cluster timestamp and id.
+   */
+  public static ApplicationId createApplicationId(long timestamp, int id) {
+    try {
+      try {
+        // For Hadoop-2.1
+        Method method = ApplicationId.class.getMethod("newInstance", long.class, int.class);
+        return (ApplicationId) method.invoke(null, timestamp, id);
+      } catch (NoSuchMethodException e) {
+        // Try with Hadoop-2.0 way
+        ApplicationId appId = Records.newRecord(ApplicationId.class);
+
+        Method setClusterTimestamp = ApplicationId.class.getMethod("setClusterTimestamp", long.class);
+        Method setId = ApplicationId.class.getMethod("setId", int.class);
+
+        setClusterTimestamp.invoke(appId, timestamp);
+        setId.invoke(appId, id);
+
+        return appId;
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Helper method to get delegation tokens for the given LocationFactory.
+   * @param config The hadoop configuration.
+   * @param locationFactory The LocationFactory for generating tokens.
+   * @param credentials Credentials for storing tokens acquired.
+   * @return List of delegation Tokens acquired.
+   */
+  public static List<Token<?>> addDelegationTokens(Configuration config,
+                                                   LocationFactory locationFactory,
+                                                   Credentials credentials) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      LOG.debug("Security is not enabled");
+      return ImmutableList.of();
+    }
+
+    FileSystem fileSystem = getFileSystem(locationFactory);
+
+    if (fileSystem == null) {
+      LOG.debug("LocationFactory is not HDFS");
+      return ImmutableList.of();
+    }
+
+    String renewer = getYarnTokenRenewer(config);
+
+    Token<?>[] tokens = fileSystem.addDelegationTokens(renewer, credentials);
+    return tokens == null ? ImmutableList.<Token<?>>of() : ImmutableList.copyOf(tokens);
+  }
+
+  public static ByteBuffer encodeCredentials(Credentials credentials) {
+    try {
+      DataOutputBuffer out = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(out);
+      return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+    } catch (IOException e) {
+      // Shouldn't throw
+      LOG.error("Failed to encode Credentials.", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Decodes {@link Credentials} from the given buffer.
+   * If the buffer is null or empty, it returns an empty Credentials.
+   */
+  public static Credentials decodeCredentials(ByteBuffer buffer) throws IOException {
+    Credentials credentials = new Credentials();
+    if (buffer != null && buffer.hasRemaining()) {
+      DataInputByteBuffer in = new DataInputByteBuffer();
+      in.reset(buffer);
+      credentials.readTokenStorageStream(in);
+    }
+    return credentials;
+  }
+
+  public static String getYarnTokenRenewer(Configuration config) throws IOException {
+    String rmHost = getRMAddress(config).getHostName();
+    String renewer = SecurityUtil.getServerPrincipal(config.get(YarnConfiguration.RM_PRINCIPAL), rmHost);
+
+    if (renewer == null || renewer.length() == 0) {
+      throw new IOException("No Kerberos principal for Yarn RM to use as renewer");
+    }
+
+    return renewer;
+  }
+
+  public static InetSocketAddress getRMAddress(Configuration config) {
+    return config.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+                                YarnConfiguration.DEFAULT_RM_ADDRESS,
+                                YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  /**
+   * Returns true if Hadoop-2.0 classes are in the classpath.
+   */
+  public static boolean isHadoop20() {
+    Boolean hadoop20 = HADOOP_20.get();
+    if (hadoop20 != null) {
+      return hadoop20;
+    }
+    try {
+      Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
+      HADOOP_20.set(false);
+      return false;
+    } catch (ClassNotFoundException e) {
+      HADOOP_20.set(true);
+      return true;
+    }
+  }
+
+  /**
+   * Helper method to create adapter class for bridging between Hadoop 2.0 and 2.1
+   */
+  private static <T> T createAdapter(Class<T> clz) {
+    String className = clz.getPackage().getName();
+
+    if (isHadoop20()) {
+      className += ".Hadoop20" + clz.getSimpleName();
+    } else {
+      className += ".Hadoop21" + clz.getSimpleName();
+    }
+
+    try {
+      return (T) Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private static YarnLocalResource setLocalResourceType(YarnLocalResource localResource, LocalFile localFile) {
+    if (localFile.isArchive()) {
+      if (localFile.getPattern() == null) {
+        localResource.setType(LocalResourceType.ARCHIVE);
+      } else {
+        localResource.setType(LocalResourceType.PATTERN);
+        localResource.setPattern(localFile.getPattern());
+      }
+    } else {
+      localResource.setType(LocalResourceType.FILE);
+    }
+    return localResource;
+  }
+
+  private static <T> Map<String, T> transformResource(Map<String, YarnLocalResource> from) {
+    return Maps.transformValues(from, new Function<YarnLocalResource, T>() {
+      @Override
+      public T apply(YarnLocalResource resource) {
+        return resource.getLocalResource();
+      }
+    });
+  }
+
+  /**
+   * Gets the Hadoop FileSystem from LocationFactory.
+   */
+  private static FileSystem getFileSystem(LocationFactory locationFactory) {
+    if (locationFactory instanceof HDFSLocationFactory) {
+      return ((HDFSLocationFactory) locationFactory).getFileSystem();
+    }
+    if (locationFactory instanceof ForwardingLocationFactory) {
+      return getFileSystem(((ForwardingLocationFactory) locationFactory).getDelegate());
+    }
+    return null;
+  }
+
+  private YarnUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java b/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
new file mode 100644
index 0000000..d6ec9f7
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains class for interacting with Yarn.
+ */
+package org.apache.twill.internal.yarn;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java b/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
new file mode 100644
index 0000000..4d20c9c
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.api.SecureStoreUpdater;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.yarn.YarnUtils;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+
+/**
+ * Package private class for updating location related secure store.
+ */
+final class LocationSecureStoreUpdater implements SecureStoreUpdater {
+
+  private final Configuration configuration;
+  private final LocationFactory locationFactory;
+
+  LocationSecureStoreUpdater(Configuration configuration, LocationFactory locationFactory) {
+    this.configuration = configuration;
+    this.locationFactory = locationFactory;
+  }
+
+  @Override
+  public SecureStore update(String application, RunId runId) {
+    try {
+      Credentials credentials = new Credentials();
+      YarnUtils.addDelegationTokens(configuration, locationFactory, credentials);
+      return YarnSecureStore.create(credentials);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
new file mode 100644
index 0000000..2974c3f
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.internal.json.ResourceReportAdapter;
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+
+/**
+ * Package private class to get {@link ResourceReport} from the application master.
+ */
+final class ResourceReportClient {
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceReportClient.class);
+
+  private final ResourceReportAdapter reportAdapter;
+  private final URL resourceUrl;
+
+  ResourceReportClient(URL resourceUrl) {
+    this.resourceUrl = resourceUrl;
+    this.reportAdapter = ResourceReportAdapter.create();
+  }
+
+  /**
+   * Returns the resource usage of the application fetched from the resource endpoint URL.
+   * @return A {@link ResourceReport} or {@code null} if failed to fetch the report.
+   */
+  public ResourceReport get() {
+    try {
+      Reader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), Charsets.UTF_8));
+      try {
+        return reportAdapter.fromJson(reader);
+      } finally {
+        Closeables.closeQuietly(reader);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception getting resource report from {}.", resourceUrl, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java b/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
new file mode 100644
index 0000000..e6f461a
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.SecureStore;
+import org.apache.hadoop.security.Credentials;
+
+/**
+ * A {@link SecureStore} for hadoop credentials.
+ */
+public final class YarnSecureStore implements SecureStore {
+
+  private final Credentials credentials;
+
+  public static SecureStore create(Credentials credentials) {
+    return new YarnSecureStore(credentials);
+  }
+
+  private YarnSecureStore(Credentials credentials) {
+    this.credentials = credentials;
+  }
+
+  @Override
+  public Credentials getStore() {
+    return credentials;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
new file mode 100644
index 0000000..4c240fb
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.internal.AbstractTwillController;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.appmaster.TrackerService;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link org.apache.twill.api.TwillController} that controllers application running on Hadoop YARN.
+ */
+final class YarnTwillController extends AbstractTwillController implements TwillController {
+
+  private static final Logger LOG = LoggerFactory.getLogger(YarnTwillController.class);
+
+  private final Callable<ProcessController<YarnApplicationReport>> startUp;
+  private ProcessController<YarnApplicationReport> processController;
+  private ResourceReportClient resourcesClient;
+
+  /**
+   * Creates an instance without any {@link LogHandler}.
+   */
+  YarnTwillController(RunId runId, ZKClient zkClient, Callable<ProcessController<YarnApplicationReport>> startUp) {
+    this(runId, zkClient, ImmutableList.<LogHandler>of(), startUp);
+  }
+
+  YarnTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers,
+                      Callable<ProcessController<YarnApplicationReport>> startUp) {
+    super(runId, zkClient, logHandlers);
+    this.startUp = startUp;
+  }
+
+
+  /**
+   * Sends a message to application to notify the secure store has be updated.
+   */
+  ListenableFuture<Void> secureStoreUpdated() {
+    return sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
+  }
+
+  @Override
+  protected void doStartUp() {
+    super.doStartUp();
+
+    // Submit and poll the status of the yarn application
+    try {
+      processController = startUp.call();
+
+      YarnApplicationReport report = processController.getReport();
+      LOG.debug("Application {} submit", report.getApplicationId());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      StopWatch stopWatch = new StopWatch();
+      stopWatch.start();
+      stopWatch.split();
+      long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_START_SECONDS, TimeUnit.SECONDS);
+
+      LOG.info("Checking yarn application status");
+      while (!hasRun(state) && stopWatch.getSplitTime() < maxTime) {
+        report = processController.getReport();
+        state = report.getYarnApplicationState();
+        LOG.debug("Yarn application status: {}", state);
+        TimeUnit.SECONDS.sleep(1);
+        stopWatch.split();
+      }
+      LOG.info("Yarn application is in state {}", state);
+      if (state != YarnApplicationState.RUNNING) {
+        LOG.info("Yarn application is not in running state. Shutting down controller.",
+                 Constants.APPLICATION_MAX_START_SECONDS);
+        forceShutDown();
+      } else {
+        try {
+          URL resourceUrl = URI.create(String.format("http://%s:%d", report.getHost(), report.getRpcPort()))
+                               .resolve(TrackerService.PATH).toURL();
+          resourcesClient = new ResourceReportClient(resourceUrl);
+        } catch (IOException e) {
+          resourcesClient = null;
+        }
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  protected void doShutDown() {
+    if (processController == null) {
+      LOG.warn("No process controller for application that is not submitted.");
+      return;
+    }
+
+    // Wait for the stop message being processed
+    try {
+      Uninterruptibles.getUninterruptibly(getStopMessageFuture(),
+                                          Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Failed to wait for stop message being processed.", e);
+      // Kill the application through yarn
+      kill();
+    }
+
+    // Poll application status from yarn
+    try {
+      StopWatch stopWatch = new StopWatch();
+      stopWatch.start();
+      stopWatch.split();
+      long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
+
+      YarnApplicationReport report = processController.getReport();
+      FinalApplicationStatus finalStatus = report.getFinalApplicationStatus();
+      while (finalStatus == FinalApplicationStatus.UNDEFINED && stopWatch.getSplitTime() < maxTime) {
+        LOG.debug("Yarn application final status for {} {}", report.getApplicationId(), finalStatus);
+        TimeUnit.SECONDS.sleep(1);
+        stopWatch.split();
+        finalStatus = processController.getReport().getFinalApplicationStatus();
+      }
+      LOG.debug("Yarn application final status is {}", finalStatus);
+
+      // Application not finished after max stop time, kill the application
+      if (finalStatus == FinalApplicationStatus.UNDEFINED) {
+        kill();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception while waiting for application report: {}", e.getMessage(), e);
+      kill();
+    }
+
+    super.doShutDown();
+  }
+
+  @Override
+  public void kill() {
+    if (processController != null) {
+      YarnApplicationReport report = processController.getReport();
+      LOG.info("Killing application {}", report.getApplicationId());
+      processController.cancel();
+    } else {
+      LOG.warn("No process controller for application that is not submitted.");
+    }
+  }
+
+  @Override
+  protected void instanceNodeUpdated(NodeData nodeData) {
+
+  }
+
+  @Override
+  protected void stateNodeUpdated(StateNode stateNode) {
+
+  }
+
+  private boolean hasRun(YarnApplicationState state) {
+    switch (state) {
+      case RUNNING:
+      case FINISHED:
+      case FAILED:
+      case KILLED:
+        return true;
+    }
+    return false;
+  }
+
+  @Override
+  public ResourceReport getResourceReport() {
+    // in case the user calls this before starting, return null
+    return (resourcesClient == null) ? null : resourcesClient.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
new file mode 100644
index 0000000..11c2ae6
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Factory for creating {@link YarnTwillController}.
+ */
+interface YarnTwillControllerFactory {
+
+  YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+                             Callable<ProcessController<YarnApplicationReport>> startUp);
+}