You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/01/07 20:56:52 UTC

[1/2] twill git commit: (TWILL-63) Speed up application launch time

Repository: twill
Updated Branches:
  refs/heads/master 5edc8ddbe -> 5986553ba


http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 562d1da..5f599d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -17,10 +17,9 @@
  */
 package org.apache.twill.yarn;
 
-import com.google.common.base.Charsets;
+import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
@@ -33,6 +32,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.OutputSupplier;
 import com.google.common.reflect.TypeToken;
@@ -43,6 +44,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.twill.api.ClassAcceptor;
+import org.apache.twill.api.Configs;
 import org.apache.twill.api.EventHandlerSpecification;
 import org.apache.twill.api.LocalFile;
 import org.apache.twill.api.RunId;
@@ -54,10 +56,8 @@ import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.filesystem.Location;
-import org.apache.twill.filesystem.LocationFactory;
 import org.apache.twill.internal.ApplicationBundler;
 import org.apache.twill.internal.Arguments;
-import org.apache.twill.internal.Configs;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.DefaultLocalFile;
 import org.apache.twill.internal.DefaultRuntimeSpecification;
@@ -67,11 +67,11 @@ import org.apache.twill.internal.JvmOptions;
 import org.apache.twill.internal.LogOnlyEventHandler;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.TwillRuntimeSpecification;
 import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterMain;
 import org.apache.twill.internal.container.TwillContainerMain;
+import org.apache.twill.internal.io.LocationCache;
 import org.apache.twill.internal.json.ArgumentsCodec;
 import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
@@ -88,6 +88,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -96,6 +97,11 @@ import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -112,12 +118,18 @@ import java.util.jar.JarOutputStream;
 final class YarnTwillPreparer implements TwillPreparer {
 
   private static final Logger LOG = LoggerFactory.getLogger(YarnTwillPreparer.class);
+  private static final Function<Class<?>, String> CLASS_TO_NAME = new Function<Class<?>, String>() {
+    @Override
+    public String apply(Class<?> cls) {
+      return cls.getName();
+    }
+  };
 
   private final YarnConfiguration yarnConfig;
   private final TwillSpecification twillSpec;
   private final YarnAppClient yarnAppClient;
   private final String zkConnectString;
-  private final LocationFactory locationFactory;
+  private final Location appLocation;
   private final YarnTwillControllerFactory controllerFactory;
   private final RunId runId;
 
@@ -131,29 +143,37 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final List<String> applicationClassPaths = Lists.newArrayList();
   private final Credentials credentials;
   private final int reservedMemory;
+  private final File localStagingDir;
   private final Map<String, Map<String, String>> logLevels = new HashMap<>();
+  private final LocationCache locationCache;
+  private final Set<URL> twillClassPaths;
   private String schedulerQueue;
   private String extraOptions;
   private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
   private ClassAcceptor classAcceptor;
 
-  YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
-                    YarnAppClient yarnAppClient, String zkConnectString,
-                    LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel,
-                    YarnTwillControllerFactory controllerFactory) {
+  YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, RunId runId,
+                    YarnAppClient yarnAppClient, String zkConnectString, Location appLocation, Set<URL> twillClassPaths,
+                    String extraOptions, LocationCache locationCache, YarnTwillControllerFactory controllerFactory) {
     this.yarnConfig = yarnConfig;
     this.twillSpec = twillSpec;
+    this.runId = runId;
     this.yarnAppClient = yarnAppClient;
     this.zkConnectString = zkConnectString;
-    this.locationFactory = locationFactory;
+    this.appLocation = appLocation;
     this.controllerFactory = controllerFactory;
-    this.runId = RunIds.generate();
     this.credentials = createCredentials();
     this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
                                             Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
+    this.localStagingDir = new File(yarnConfig.get(Configs.Keys.LOCAL_STAGING_DIRECTORY,
+                                                   Configs.Defaults.LOCAL_STAGING_DIRECTORY));
     this.extraOptions = extraOptions;
     this.classAcceptor = new ClassAcceptor();
-    saveLogLevels(logLevel);
+    this.locationCache = locationCache;
+    this.twillClassPaths = twillClassPaths;
+
+    // By default, the root logger is having INFO log level
+    setLogLevel(LogEntry.Level.INFO);
   }
 
   @Override
@@ -338,23 +358,27 @@ final class YarnTwillPreparer implements TwillPreparer {
 
           // Local files needed by AM
           Map<String, LocalFile> localFiles = Maps.newHashMap();
-          // Local files declared by runnables
-          Multimap<String, LocalFile> runnableLocalFiles = HashMultimap.create();
-
-          createAppMasterJar(createBundler(), localFiles);
-          createContainerJar(createBundler(), localFiles);
-          populateRunnableLocalFiles(twillSpec, runnableLocalFiles);
-          saveSpecification(twillSpec, runnableLocalFiles, localFiles);
-          saveLogback(localFiles);
-          saveLauncher(localFiles);
-          saveJvmOptions(localFiles);
-          saveArguments(new Arguments(arguments, runnableArgs), localFiles);
-          saveEnvironments(localFiles);
-          saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
-                                                     Constants.Files.LOGBACK_TEMPLATE,
-                                                     Constants.Files.CONTAINER_JAR,
-                                                     Constants.Files.LAUNCHER_JAR,
-                                                     Constants.Files.ARGUMENTS));
+
+          createLauncherJar(localFiles);
+          createTwillJar(createBundler(classAcceptor), localFiles);
+          createApplicationJar(createApplicationJarBundler(classAcceptor), localFiles);
+          createResourcesJar(createBundler(classAcceptor), localFiles);
+
+          Path runtimeConfigDir = Files.createTempDirectory(localStagingDir.toPath(),
+                                                            Constants.Files.RUNTIME_CONFIG_JAR);
+          try {
+            saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
+            saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE));
+            saveClassPaths(runtimeConfigDir);
+            saveJvmOptions(runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
+            saveArguments(new Arguments(arguments, runnableArgs), runtimeConfigDir.resolve(Constants.Files.ARGUMENTS));
+            saveEnvironments(runtimeConfigDir.resolve(Constants.Files.ENVIRONMENTS));
+            createRuntimeConfigJar(runtimeConfigDir, localFiles);
+          } finally {
+            Paths.deleteRecursively(runtimeConfigDir);
+          }
+
+          createLocalizeFilesJson(localFiles);
 
           LOG.debug("Submit AM container spec: {}", appMasterInfo);
           // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
@@ -375,7 +399,6 @@ final class YarnTwillPreparer implements TwillPreparer {
               "-Xmx" + memory + "m",
               extraOptions == null ? "" : extraOptions,
               TwillLauncher.class.getName(),
-              Constants.Files.APP_MASTER_JAR,
               ApplicationMasterMain.class.getName(),
               Boolean.FALSE.toString())
             .launch();
@@ -406,15 +429,6 @@ final class YarnTwillPreparer implements TwillPreparer {
     }
   }
 
-  private void saveLogLevels(LogEntry.Level level) {
-    Preconditions.checkNotNull(level);
-    Map<String, String> appLogLevels = new HashMap<>();
-    appLogLevels.put(Logger.ROOT_LOGGER_NAME, level.name());
-    for (String runnableName : twillSpec.getRunnables().keySet()) {
-      this.logLevels.put(runnableName, appLogLevels);
-    }
-  }
-
   private void saveLogLevels(String runnableName, Map<String, LogEntry.Level> logLevels) {
     Map<String, String> newLevels = new HashMap<>();
     for (Map.Entry<String, LogEntry.Level> entry : logLevels.entrySet()) {
@@ -430,9 +444,11 @@ final class YarnTwillPreparer implements TwillPreparer {
     try {
       credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
 
-      List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, locationFactory, credentials);
-      for (Token<?> token : tokens) {
-        LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation(), token);
+      List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, appLocation.getLocationFactory(), credentials);
+      if (LOG.isDebugEnabled()) {
+        for (Token<?> token : tokens) {
+          LOG.debug("Delegation token acquired for {}, {}", appLocation, token);
+        }
       }
     } catch (IOException e) {
       LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
@@ -440,10 +456,6 @@ final class YarnTwillPreparer implements TwillPreparer {
     return credentials;
   }
 
-  private ApplicationBundler createBundler() {
-    return new ApplicationBundler(classAcceptor);
-  }
-
   private LocalFile createLocalFile(String name, Location location) throws IOException {
     return createLocalFile(name, location, false);
   }
@@ -452,62 +464,104 @@ final class YarnTwillPreparer implements TwillPreparer {
     return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
   }
 
-  private void createAppMasterJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
-    try {
-      LOG.debug("Create and copy {}", Constants.Files.APP_MASTER_JAR);
-      Location location = createTempLocation(Constants.Files.APP_MASTER_JAR);
+  private void createTwillJar(final ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+    LOG.debug("Create and copy {}", Constants.Files.TWILL_JAR);
+    Location location = locationCache.get(Constants.Files.TWILL_JAR, new LocationCache.Loader() {
+      @Override
+      public void load(String name, Location targetLocation) throws IOException {
+        // Stuck in the yarnAppClient class to make bundler being able to pickup the right yarn-client version
+        bundler.createBundle(targetLocation, ApplicationMasterMain.class,
+                             yarnAppClient.getClass(), TwillContainerMain.class);
+      }
+    });
+
+    LOG.debug("Done {}", Constants.Files.TWILL_JAR);
+    localFiles.put(Constants.Files.TWILL_JAR, createLocalFile(Constants.Files.TWILL_JAR, location, true));
+  }
 
-      List<Class<?>> classes = Lists.newArrayList();
-      classes.add(ApplicationMasterMain.class);
+  private void createApplicationJar(final ApplicationBundler bundler,
+                                    Map<String, LocalFile> localFiles) throws IOException {
+    try {
+      final Set<Class<?>> classes = Sets.newIdentityHashSet();
+      classes.addAll(dependencies);
 
-      // Stuck in the yarnAppClient class to make bundler being able to pickup the right yarn-client version
-      classes.add(yarnAppClient.getClass());
+      ClassLoader classLoader = getClassLoader();
+      for (RuntimeSpecification spec : twillSpec.getRunnables().values()) {
+        classes.add(classLoader.loadClass(spec.getRunnableSpecification().getClassName()));
+      }
 
       // Add the TwillRunnableEventHandler class
       if (twillSpec.getEventHandler() != null) {
         classes.add(getClassLoader().loadClass(twillSpec.getEventHandler().getClassName()));
       }
 
-      bundler.createBundle(location, classes);
-      LOG.debug("Done {}", Constants.Files.APP_MASTER_JAR);
+      // The location name is computed from the MD5 of all the classes names
+      // The localized name is always APPLICATION_JAR
+      List<String> classList = Lists.newArrayList(Iterables.transform(classes, CLASS_TO_NAME));
+      Collections.sort(classList);
+      Hasher hasher = Hashing.md5().newHasher();
+      for (String name : classList) {
+        hasher.putString(name);
+      }
+      String name = twillSpec.getName() + "-" + hasher.hash().toString() + "-" + Constants.Files.APPLICATION_JAR;
+
+      LOG.debug("Create and copy {}", Constants.Files.APPLICATION_JAR);
+      Location location = locationCache.get(name, new LocationCache.Loader() {
+        @Override
+        public void load(String name, Location targetLocation) throws IOException {
+          bundler.createBundle(targetLocation, classes);
+        }
+      });
+
+      LOG.debug("Done {}", Constants.Files.APPLICATION_JAR);
+
+      localFiles.put(Constants.Files.APPLICATION_JAR, createLocalFile(Constants.Files.APPLICATION_JAR, location, true));
 
-      localFiles.put(Constants.Files.APP_MASTER_JAR, createLocalFile(Constants.Files.APP_MASTER_JAR, location));
     } catch (ClassNotFoundException e) {
       throw Throwables.propagate(e);
     }
   }
 
-  private void createContainerJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
-    try {
-      Set<Class<?>> classes = Sets.newIdentityHashSet();
-      classes.add(TwillContainerMain.class);
-      classes.addAll(dependencies);
-
-      ClassLoader classLoader = getClassLoader();
-      for (RuntimeSpecification spec : twillSpec.getRunnables().values()) {
-        classes.add(classLoader.loadClass(spec.getRunnableSpecification().getClassName()));
-      }
+  private void createResourcesJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+    // If there is no resources, no need to create the jar file.
+    if (resources.isEmpty()) {
+      return;
+    }
 
-      LOG.debug("Create and copy {}", Constants.Files.CONTAINER_JAR);
-      Location location = createTempLocation(Constants.Files.CONTAINER_JAR);
-      bundler.createBundle(location, classes, resources);
-      LOG.debug("Done {}", Constants.Files.CONTAINER_JAR);
+    LOG.debug("Create and copy {}", Constants.Files.RESOURCES_JAR);
+    Location location = createTempLocation(Constants.Files.RESOURCES_JAR);
+    bundler.createBundle(location, Collections.<Class<?>>emptyList(), resources);
+    LOG.debug("Done {}", Constants.Files.RESOURCES_JAR);
+    localFiles.put(Constants.Files.RESOURCES_JAR, createLocalFile(Constants.Files.RESOURCES_JAR, location, true));
+  }
 
-      localFiles.put(Constants.Files.CONTAINER_JAR, createLocalFile(Constants.Files.CONTAINER_JAR, location));
+  private void createRuntimeConfigJar(Path dir, Map<String, LocalFile> localFiles) throws IOException {
+    LOG.debug("Create and copy {}", Constants.Files.RUNTIME_CONFIG_JAR);
 
-    } catch (ClassNotFoundException e) {
-      throw Throwables.propagate(e);
+    // Jar everything under the given directory, which contains different files needed by AM/runnable containers
+    Location location = createTempLocation(Constants.Files.RUNTIME_CONFIG_JAR);
+    try (
+      JarOutputStream jarOutput = new JarOutputStream(location.getOutputStream());
+      DirectoryStream<Path> stream = Files.newDirectoryStream(dir)
+    ) {
+      for (Path path : stream) {
+        jarOutput.putNextEntry(new JarEntry(path.getFileName().toString()));
+        Files.copy(path, jarOutput);
+        jarOutput.closeEntry();
+      }
     }
+
+    LOG.debug("Done {}", Constants.Files.RUNTIME_CONFIG_JAR);
+    localFiles.put(Constants.Files.RUNTIME_CONFIG_JAR,
+                   createLocalFile(Constants.Files.RUNTIME_CONFIG_JAR, location, true));
   }
 
   /**
    * Based on the given {@link TwillSpecification}, upload LocalFiles to Yarn Cluster.
    * @param twillSpec The {@link TwillSpecification} for populating resource.
-   * @param localFiles A Multimap to store runnable name to transformed LocalFiles.
-   * @throws IOException
    */
-  private void populateRunnableLocalFiles(TwillSpecification twillSpec,
-                                          Multimap<String, LocalFile> localFiles) throws IOException {
+  private Multimap<String, LocalFile> populateRunnableLocalFiles(TwillSpecification twillSpec) throws IOException {
+    Multimap<String, LocalFile> localFiles = HashMultimap.create();
 
     LOG.debug("Populating Runnable LocalFiles");
     for (Map.Entry<String, RuntimeSpecification> entry: twillSpec.getRunnables().entrySet()) {
@@ -516,14 +570,14 @@ final class YarnTwillPreparer implements TwillPreparer {
         Location location;
 
         URI uri = localFile.getURI();
-        if (locationFactory.getHomeLocation().toURI().getScheme().equals(uri.getScheme())) {
+        if (appLocation.toURI().getScheme().equals(uri.getScheme())) {
           // If the source file location is having the same scheme as the target location, no need to copy
-          location = locationFactory.create(uri);
+          location = appLocation.getLocationFactory().create(uri);
         } else {
           URL url = uri.toURL();
           LOG.debug("Create and copy {} : {}", runnableName, url);
           // Preserves original suffix for expansion.
-          location = copyFromURL(url, createTempLocation(Paths.appendSuffix(url.getFile(), localFile.getName())));
+          location = copyFromURL(url, createTempLocation(Paths.addExtension(url.getFile(), localFile.getName())));
           LOG.debug("Done {} : {}", runnableName, url);
         }
 
@@ -533,10 +587,12 @@ final class YarnTwillPreparer implements TwillPreparer {
       }
     }
     LOG.debug("Done Runnable LocalFiles");
+    return localFiles;
   }
 
-  private void saveSpecification(TwillSpecification spec, final Multimap<String, LocalFile> runnableLocalFiles,
-                                 Map<String, LocalFile> localFiles) throws IOException {
+  private void saveSpecification(TwillSpecification spec, Path targetFile) throws IOException {
+    final Multimap<String, LocalFile> runnableLocalFiles = populateRunnableLocalFiles(spec);
+
     // Rewrite LocalFiles inside twillSpec
     Map<String, RuntimeSpecification> runtimeSpec = Maps.transformEntries(
       spec.getRunnables(), new Maps.EntryTransformer<String, RuntimeSpecification, RuntimeSpecification>() {
@@ -548,9 +604,8 @@ final class YarnTwillPreparer implements TwillPreparer {
     });
 
     // Serialize into a local temp file.
-    LOG.debug("Create and copy {}", Constants.Files.TWILL_SPEC);
-    Location location = createTempLocation(Constants.Files.TWILL_SPEC);
-    try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+    LOG.debug("Creating {}", targetFile);
+    try (Writer writer = Files.newBufferedWriter(targetFile, StandardCharsets.UTF_8)) {
       EventHandlerSpecification eventHandler = spec.getEventHandler();
       if (eventHandler == null) {
         eventHandler = new LogOnlyEventHandler().configure();
@@ -558,136 +613,124 @@ final class YarnTwillPreparer implements TwillPreparer {
       TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
                                                                       spec.getPlacementPolicies(), eventHandler);
       TwillRuntimeSpecificationAdapter.create().toJson(
-        new TwillRuntimeSpecification(newTwillSpec, locationFactory.getHomeLocation().getName(),
-                                      getAppLocation().toURI(), zkConnectString, runId, twillSpec.getName(),
+        new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
+                                      appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
                                       reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
                                       logLevels), writer);
     }
-    LOG.debug("Done {}", Constants.Files.TWILL_SPEC);
-
-    localFiles.put(Constants.Files.TWILL_SPEC, createLocalFile(Constants.Files.TWILL_SPEC, location));
+    LOG.debug("Done {}", targetFile);
   }
 
-  private void saveLogback(Map<String, LocalFile> localFiles) throws IOException {
-    LOG.debug("Create and copy {}", Constants.Files.LOGBACK_TEMPLATE);
-    Location location = copyFromURL(getClass().getClassLoader().getResource(Constants.Files.LOGBACK_TEMPLATE),
-                                    createTempLocation(Constants.Files.LOGBACK_TEMPLATE));
-    LOG.debug("Done {}", Constants.Files.LOGBACK_TEMPLATE);
+  private void saveLogback(Path targetFile) throws IOException {
+    URL url = getClass().getClassLoader().getResource(Constants.Files.LOGBACK_TEMPLATE);
+    if (url == null) {
+      return;
+    }
 
-    localFiles.put(Constants.Files.LOGBACK_TEMPLATE, createLocalFile(Constants.Files.LOGBACK_TEMPLATE, location));
+    LOG.debug("Creating {}", targetFile);
+    try (InputStream is = url.openStream()) {
+      Files.copy(is, targetFile);
+    }
+    LOG.debug("Done {}", targetFile);
   }
 
   /**
    * Creates the launcher.jar for launch the main application.
    */
-  private void saveLauncher(Map<String, LocalFile> localFiles) throws URISyntaxException, IOException {
+  private void createLauncherJar(Map<String, LocalFile> localFiles) throws URISyntaxException, IOException {
 
     LOG.debug("Create and copy {}", Constants.Files.LAUNCHER_JAR);
-    Location location = createTempLocation(Constants.Files.LAUNCHER_JAR);
 
-    final String launcherName = TwillLauncher.class.getName();
-    final String portFinderName = FindFreePort.class.getName();
-
-    // Create a jar file with the TwillLauncher optionally a json serialized classpath.json in it.
-    // Also a little utility to find a free port, used for debugging.
-    final JarOutputStream jarOut = new JarOutputStream(location.getOutputStream());
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = getClass().getClassLoader();
-    }
-    Dependencies.findClassDependencies(classLoader, new ClassAcceptor() {
+    Location location = locationCache.get(Constants.Files.LAUNCHER_JAR, new LocationCache.Loader() {
       @Override
-      public boolean accept(String className, URL classUrl, URL classPathUrl) {
-        Preconditions.checkArgument(className.startsWith(launcherName) || className.equals(portFinderName),
-                                    "Launcher jar should not have dependencies: %s", className);
-        try {
-          jarOut.putNextEntry(new JarEntry(className.replace('.', '/') + ".class"));
-          try (InputStream is = classUrl.openStream()) {
-            ByteStreams.copy(is, jarOut);
+      public void load(String name, Location targetLocation) throws IOException {
+        // Create a jar file with the TwillLauncher and FindFreePort and dependent classes inside.
+        try (final JarOutputStream jarOut = new JarOutputStream(targetLocation.getOutputStream())) {
+          ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+          if (classLoader == null) {
+            classLoader = getClass().getClassLoader();
           }
-        } catch (IOException e) {
-          throw Throwables.propagate(e);
+          Dependencies.findClassDependencies(classLoader, new ClassAcceptor() {
+            @Override
+            public boolean accept(String className, URL classUrl, URL classPathUrl) {
+              try {
+                jarOut.putNextEntry(new JarEntry(className.replace('.', '/') + ".class"));
+                try (InputStream is = classUrl.openStream()) {
+                  ByteStreams.copy(is, jarOut);
+                }
+              } catch (IOException e) {
+                throw Throwables.propagate(e);
+              }
+              return true;
+            }
+          }, TwillLauncher.class.getName(), FindFreePort.class.getName());
         }
-        return true;
       }
-    }, launcherName, portFinderName);
+    });
 
-    try {
-      addClassPaths(Constants.CLASSPATH, classPaths, jarOut);
-      addClassPaths(Constants.APPLICATION_CLASSPATH, applicationClassPaths, jarOut);
-    } finally {
-      jarOut.close();
-    }
     LOG.debug("Done {}", Constants.Files.LAUNCHER_JAR);
 
     localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location));
   }
 
-  private void addClassPaths(String classpathId, List<String> classPaths, JarOutputStream jarOut) throws IOException {
-    if (!classPaths.isEmpty()) {
-      jarOut.putNextEntry(new JarEntry(classpathId));
-      jarOut.write(Joiner.on(':').join(classPaths).getBytes(Charsets.UTF_8));
-    }
+  private void saveClassPaths(Path targetDir) throws IOException {
+    Files.write(targetDir.resolve(Constants.Files.APPLICATION_CLASSPATH),
+                Joiner.on(':').join(applicationClassPaths).getBytes(StandardCharsets.UTF_8));
+    Files.write(targetDir.resolve(Constants.Files.CLASSPATH),
+                Joiner.on(':').join(classPaths).getBytes(StandardCharsets.UTF_8));
   }
 
-  private void saveJvmOptions(Map<String, LocalFile> localFiles) throws IOException {
+  private void saveJvmOptions(final Path targetPath) throws IOException {
     if ((extraOptions == null || extraOptions.isEmpty()) &&
       JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) {
       // If no vm options, no need to localize the file.
       return;
     }
-    LOG.debug("Create and copy {}", Constants.Files.JVM_OPTIONS);
-    final Location location = createTempLocation(Constants.Files.JVM_OPTIONS);
+    LOG.debug("Creating {}", targetPath);
     JvmOptionsCodec.encode(new JvmOptions(extraOptions, debugOptions), new OutputSupplier<Writer>() {
       @Override
       public Writer getOutput() throws IOException {
-        return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+        return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8);
       }
     });
-    LOG.debug("Done {}", Constants.Files.JVM_OPTIONS);
-
-    localFiles.put(Constants.Files.JVM_OPTIONS, createLocalFile(Constants.Files.JVM_OPTIONS, location));
+    LOG.debug("Done {}", targetPath);
   }
 
-  private void saveArguments(Arguments arguments, Map<String, LocalFile> localFiles) throws IOException {
-    LOG.debug("Create and copy {}", Constants.Files.ARGUMENTS);
-    final Location location = createTempLocation(Constants.Files.ARGUMENTS);
+  private void saveArguments(Arguments arguments, final Path targetPath) throws IOException {
+    LOG.debug("Creating {}", targetPath);
     ArgumentsCodec.encode(arguments, new OutputSupplier<Writer>() {
       @Override
       public Writer getOutput() throws IOException {
-        return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+        return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8);
       }
     });
-    LOG.debug("Done {}", Constants.Files.ARGUMENTS);
-
-    localFiles.put(Constants.Files.ARGUMENTS, createLocalFile(Constants.Files.ARGUMENTS, location));
+    LOG.debug("Done {}", targetPath);
   }
 
-  private void saveEnvironments(Map<String, LocalFile> localFiles) throws IOException {
+  private void saveEnvironments(Path targetPath) throws IOException {
     if (environments.isEmpty()) {
       return;
     }
 
-    LOG.debug("Create and copy {}", Constants.Files.ENVIRONMENTS);
-    final Location location = createTempLocation(Constants.Files.ENVIRONMENTS);
-    try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+    LOG.debug("Creating {}", targetPath);
+    try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) {
       new Gson().toJson(environments, writer);
     }
-    LOG.debug("Done {}", Constants.Files.ENVIRONMENTS);
-
-    localFiles.put(Constants.Files.ENVIRONMENTS, createLocalFile(Constants.Files.ENVIRONMENTS, location));
+    LOG.debug("Done {}", targetPath);
   }
 
   /**
-   * Serializes the list of files that needs to localize from AM to Container.
+   * Serializes the information for files that are localized to all YARN containers.
    */
-  private void saveLocalFiles(Map<String, LocalFile> localFiles, Set<String> includes) throws IOException {
-    Map<String, LocalFile> localize = ImmutableMap.copyOf(Maps.filterKeys(localFiles, Predicates.in(includes)));
+  private void createLocalizeFilesJson(Map<String, LocalFile> localFiles) throws IOException {
     LOG.debug("Create and copy {}", Constants.Files.LOCALIZE_FILES);
     Location location = createTempLocation(Constants.Files.LOCALIZE_FILES);
-    try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+
+    // Serialize the list of LocalFiles, except the one we are generating here, as this file is used by AM only.
+    // This file should never use LocationCache.
+    try (Writer writer = new OutputStreamWriter(location.getOutputStream(), StandardCharsets.UTF_8)) {
       new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
-        .create().toJson(localize.values(), new TypeToken<List<LocalFile>>() {
+        .create().toJson(localFiles.values(), new TypeToken<List<LocalFile>>() {
       }.getType(), writer);
     }
     LOG.debug("Done {}", Constants.Files.LOCALIZE_FILES);
@@ -711,16 +754,12 @@ final class YarnTwillPreparer implements TwillPreparer {
     name = fileName.substring(0, fileName.length() - suffix.length() - 1);
 
     try {
-      return getAppLocation().append(name).getTempFile('.' + suffix);
+      return appLocation.append(name).getTempFile('.' + suffix);
     } catch (IOException e) {
       throw Throwables.propagate(e);
     }
   }
 
-  private Location getAppLocation() {
-    return locationFactory.create(String.format("/%s/%s", twillSpec.getName(), runId.getId()));
-  }
-
   /**
    * Returns the context ClassLoader if there is any, otherwise, returns ClassLoader of this class.
    */
@@ -728,4 +767,23 @@ final class YarnTwillPreparer implements TwillPreparer {
     ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
     return classLoader == null ? getClass().getClassLoader() : classLoader;
   }
+
+  private ApplicationBundler createBundler(ClassAcceptor classAcceptor) {
+    return new ApplicationBundler(classAcceptor).setTempDir(localStagingDir);
+  }
+
+  /**
+   * Creates a {@link ApplicationBundler} for building application jar. The bundler will include classes
+   * accepted by the given {@link ClassAcceptor}, as long as it is not a twill class.
+   */
+  private ApplicationBundler createApplicationJarBundler(final ClassAcceptor classAcceptor) {
+    // Accept classes based on the classAcceptor and also excluding all twill classes as they are already
+    // in the twill.jar
+    return createBundler(new ClassAcceptor() {
+      @Override
+      public boolean accept(String className, URL classUrl, URL classPathUrl) {
+        return !twillClassPaths.contains(classPathUrl) && classAcceptor.accept(className, classUrl, classPathUrl);
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 1b7c4b9..405eb24 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -17,7 +17,7 @@
  */
 package org.apache.twill.yarn;
 
-import com.google.common.base.Charsets;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -34,20 +34,18 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Callables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Service;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.twill.api.ClassAcceptor;
+import org.apache.twill.api.Configs;
+import org.apache.twill.api.LocalFile;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.SecureStore;
@@ -58,23 +56,24 @@ import org.apache.twill.api.TwillPreparer;
 import org.apache.twill.api.TwillRunnable;
 import org.apache.twill.api.TwillRunnerService;
 import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.filesystem.FileContextLocationFactory;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.filesystem.LocationFactory;
-import org.apache.twill.internal.Configs;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.SingleRunnableApplication;
 import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
+import org.apache.twill.internal.io.BasicLocationCache;
+import org.apache.twill.internal.io.LocationCache;
+import org.apache.twill.internal.io.NoCachingLocationCache;
+import org.apache.twill.internal.utils.Dependencies;
 import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
 import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
-import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.zookeeper.NodeChildren;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.RetryStrategies;
@@ -93,6 +92,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.URL;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -109,7 +110,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public final class YarnTwillRunnerService implements TwillRunnerService {
 
   private static final Logger LOG = LoggerFactory.getLogger(YarnTwillRunnerService.class);
-
   private static final int ZK_TIMEOUT = 10000;
   private static final Function<String, RunId> STRING_TO_RUN_ID = new Function<String, RunId>() {
     @Override
@@ -130,8 +130,11 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
   private final ZKClientService zkClientService;
   private final LocationFactory locationFactory;
   private final Table<String, RunId, YarnTwillController> controllers;
+  private final Set<URL> twillClassPaths;
   // A Guava service to help the state transition.
   private final Service serviceDelegate;
+  private LocationCache locationCache;
+  private LocationCacheCleaner locationCacheCleaner;
   private ScheduledExecutorService secureStoreScheduler;
 
   private Iterable<LiveInfo> liveInfos;
@@ -163,6 +166,7 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
     this.locationFactory = locationFactory;
     this.zkClientService = getZKClientService(zkConnect);
     this.controllers = HashBasedTable.create();
+    this.twillClassPaths = new HashSet<>();
     this.serviceDelegate = new AbstractIdleService() {
       @Override
       protected void startUp() throws Exception {
@@ -277,9 +281,16 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
     Preconditions.checkState(serviceDelegate.isRunning(), "Service not start. Please call start() first.");
     final TwillSpecification twillSpec = application.configure();
     final String appName = twillSpec.getName();
+    RunId runId = RunIds.generate();
+    Location appLocation = locationFactory.create(String.format("/%s/%s", twillSpec.getName(), runId.getId()));
+    LocationCache locationCache = this.locationCache;
+    if (locationCache == null) {
+      locationCache = new NoCachingLocationCache(appLocation);
+    }
 
-    return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(),
-                                 locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() {
+    return new YarnTwillPreparer(yarnConfig, twillSpec, runId, yarnAppClient,
+                                 zkClientService.getConnectString(), appLocation, twillClassPaths, jvmOptions,
+                                 locationCache, new YarnTwillControllerFactory() {
       @Override
       public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
                                         Callable<ProcessController<YarnApplicationReport>> startUp,
@@ -324,6 +335,19 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
   private void startUp() throws Exception {
     zkClientService.startAndWait();
 
+    // Find all the classpaths for Twill classes. It is used for class filtering when building application jar
+    // in the YarnTwillPreparer
+    Dependencies.findClassDependencies(getClass().getClassLoader(), new ClassAcceptor() {
+      @Override
+      public boolean accept(String className, URL classUrl, URL classPathUrl) {
+        if (!className.startsWith("org.apache.twill.")) {
+          return false;
+        }
+        twillClassPaths.add(classPathUrl);
+        return true;
+      }
+    }, getClass().getName());
+
     // Create the root node, so that the namespace root would get created if it is missing
     // If the exception is caused by node exists, then it's ok. Otherwise propagate the exception.
     ZKOperations.ignoreError(zkClientService.create("/", null, CreateMode.PERSISTENT),
@@ -346,6 +370,70 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
       scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
                                 delay, delay, TimeUnit.MILLISECONDS);
     }
+
+    // Optionally create a LocationCache
+    String cacheDir = yarnConfig.get(Configs.Keys.LOCATION_CACHE_DIR);
+    if (cacheDir != null) {
+      String sessionId = Long.toString(System.currentTimeMillis());
+      try {
+        Location cacheBase = locationFactory.create(cacheDir);
+        cacheBase.mkdirs();
+        cacheBase.setPermissions("775");
+
+        // Use a unique cache directory for each instance of this class
+        Location cacheLocation = cacheBase.append(sessionId);
+        cacheLocation.mkdirs();
+        cacheLocation.setPermissions("775");
+
+        locationCache = new BasicLocationCache(cacheLocation);
+        locationCacheCleaner = startLocationCacheCleaner(cacheBase, sessionId);
+      } catch (IOException e) {
+        LOG.warn("Failed to create location cache directory. Location cache cannot be enabled.", e);
+      }
+    }
+  }
+
+  /**
+   * Forces a cleanup of location cache based on the given time.
+   */
+  @VisibleForTesting
+  void forceLocationCacheCleanup(long currentTime) {
+    locationCacheCleaner.forceCleanup(currentTime);
+  }
+
+  private LocationCacheCleaner startLocationCacheCleaner(final Location cacheBase, final String sessionId) {
+    LocationCacheCleaner cleaner = new LocationCacheCleaner(
+      yarnConfig, cacheBase, sessionId, new Predicate<Location>() {
+        @Override
+        public boolean apply(Location location) {
+          // Collects all the locations that is being used by any live applications
+          Set<Location> activeLocations = new HashSet<>();
+          synchronized (YarnTwillRunnerService.this) {
+            for (YarnTwillController controller : controllers.values()) {
+              ApplicationMasterLiveNodeData amLiveNodeData = controller.getApplicationMasterLiveNodeData();
+              if (amLiveNodeData != null) {
+                for (LocalFile localFile : amLiveNodeData.getLocalFiles()) {
+                  activeLocations.add(locationFactory.create(localFile.getURI()));
+                }
+              }
+            }
+          }
+
+          try {
+            // Always keep the launcher.jar and twill.jar from the current session as they should never change,
+            // hence never expires
+            activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.LAUNCHER_JAR));
+            activeLocations.add(cacheBase.append(sessionId).append(Constants.Files.TWILL_JAR));
+          } catch (IOException e) {
+            // This should not happen
+            LOG.warn("Failed to construct cache location", e);
+          }
+
+          return !activeLocations.contains(location);
+        }
+      });
+    cleaner.startAndWait();
+    return cleaner;
   }
 
   private void shutDown() throws Exception {
@@ -354,6 +442,9 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
     // when the JVM process is about to exit. Hence it is important that threads created in the controllers are
     // daemon threads.
     synchronized (this) {
+      if (locationCacheCleaner != null) {
+        locationCacheCleaner.stopAndWait();
+      }
       if (secureStoreScheduler != null) {
         secureStoreScheduler.shutdownNow();
       }
@@ -496,8 +587,9 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
         if (cancelled.get()) {
           return;
         }
-        ApplicationId appId = getApplicationId(result);
-        if (appId == null) {
+
+        ApplicationMasterLiveNodeData amLiveNodeData = ApplicationMasterLiveNodeDecoder.decode(result);
+        if (amLiveNodeData == null) {
           return;
         }
 
@@ -505,8 +597,7 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
           if (!controllers.contains(appName, runId)) {
             ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
             YarnTwillController controller = listenController(
-              new YarnTwillController(appName, runId, zkClient,
-                                      Callables.returning(yarnAppClient.createProcessController(appId))));
+              new YarnTwillController(appName, runId, zkClient, amLiveNodeData, yarnAppClient));
             controllers.put(appName, runId, controller);
             controller.start();
           }
@@ -521,40 +612,6 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
   }
 
 
-  /**
-   * Decodes application ID stored inside the node data.
-   * @param nodeData The node data to decode from. If it is {@code null}, this method would return {@code null}.
-   * @return The ApplicationId or {@code null} if failed to decode.
-   */
-  private ApplicationId getApplicationId(NodeData nodeData) {
-    byte[] data = nodeData == null ? null : nodeData.getData();
-    if (data == null) {
-      return null;
-    }
-
-    Gson gson = new Gson();
-    JsonElement json = gson.fromJson(new String(data, Charsets.UTF_8), JsonElement.class);
-    if (!json.isJsonObject()) {
-      LOG.warn("Unable to decode live data node.");
-      return null;
-    }
-
-    JsonObject jsonObj = json.getAsJsonObject();
-    json = jsonObj.get("data");
-    if (!json.isJsonObject()) {
-      LOG.warn("Property data not found in live data node.");
-      return null;
-    }
-
-    try {
-      ApplicationMasterLiveNodeData amLiveNode = gson.fromJson(json, ApplicationMasterLiveNodeData.class);
-      return YarnUtils.createApplicationId(amLiveNode.getAppIdClusterTime(), amLiveNode.getAppId());
-    } catch (Exception e) {
-      LOG.warn("Failed to decode application live node data.", e);
-      return null;
-    }
-  }
-
   private void updateSecureStores(Table<String, RunId, SecureStore> secureStores) {
     for (Table.Cell<String, RunId, SecureStore> cell : secureStores.cellSet()) {
       Object store = cell.getValue().getStore();

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index 4f95391..4c7d84b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.twill.api.Configs;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
 import org.junit.After;
@@ -51,7 +52,7 @@ public abstract class BaseYarnTest {
    * A singleton wrapper so that yarn cluster only bring up once across all tests in the YarnTestSuite.
    */
   @ClassRule
-  public static final TwillTester TWILL_TESTER = new TwillTester() {
+  public static final TwillTester TWILL_TESTER = new TwillTester(Configs.Keys.LOCATION_CACHE_DIR, ".cache") {
     private final AtomicInteger instances = new AtomicInteger();
 
     @Override

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
index e20dd64..b646bbc 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
@@ -19,7 +19,6 @@ package org.apache.twill.yarn;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.google.common.io.LineReader;
 import org.apache.twill.api.TwillApplication;
@@ -36,12 +35,13 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
@@ -53,15 +53,23 @@ public final class LocalFileTestRun extends BaseYarnTest {
 
   @Test
   public void testLocalFile() throws Exception {
-    String header = Files.readFirstLine(new File(getClass().getClassLoader().getResource("header.txt").toURI()),
-                                        Charsets.UTF_8);
+    // Generate a header and a footer files.
+    File headerFile = tmpFolder.newFile("header.txt");
+    File footerFile = tmpFolder.newFile("footer.txt");
+
+    String headerMsg = "Header Message";
+    String footerMsg = "Footer Message";
+
+    Files.write(headerMsg, headerFile, StandardCharsets.UTF_8);
+    Files.write(footerMsg, footerFile, StandardCharsets.UTF_8);
 
     TwillRunner runner = getTwillRunner();
 
-    TwillController controller = runner.prepare(new LocalFileApplication())
+    TwillController controller = runner.prepare(new LocalFileApplication(headerFile))
       .addJVMOptions(" -verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails")
       .withApplicationArguments("local")
       .withArguments("LocalFileSocketServer", "local2")
+      .withResources(footerFile.toURI())
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
       .start();
 
@@ -75,8 +83,9 @@ public final class LocalFileTestRun extends BaseYarnTest {
 
       String msg = "Local file test";
       writer.println(msg);
-      Assert.assertEquals(header, reader.readLine());
+      Assert.assertEquals(headerMsg, reader.readLine());
       Assert.assertEquals(msg, reader.readLine());
+      Assert.assertEquals(footerMsg, reader.readLine());
     }
 
     controller.terminate().get(120, TimeUnit.SECONDS);
@@ -91,14 +100,14 @@ public final class LocalFileTestRun extends BaseYarnTest {
    */
   public static final class LocalFileApplication implements TwillApplication {
 
-    private final File headerFile;
+    private final File headerJar;
 
-    public LocalFileApplication() throws Exception {
+    public LocalFileApplication(File headerFile) throws Exception {
       // Create a jar file that contains the header.txt file inside.
-      headerFile = tmpFolder.newFile("header.jar");
-      try (JarOutputStream os = new JarOutputStream(new FileOutputStream(headerFile))) {
-        os.putNextEntry(new JarEntry("header.txt"));
-        ByteStreams.copy(getClass().getClassLoader().getResourceAsStream("header.txt"), os);
+      headerJar = tmpFolder.newFile("header.jar");
+      try (JarOutputStream os = new JarOutputStream(new FileOutputStream(headerJar))) {
+        os.putNextEntry(new JarEntry(headerFile.getName()));
+        Files.copy(headerFile, os);
       }
     }
 
@@ -109,7 +118,7 @@ public final class LocalFileTestRun extends BaseYarnTest {
         .withRunnable()
           .add(new LocalFileSocketServer())
             .withLocalFiles()
-              .add("header", headerFile, true).apply()
+              .add("header", headerJar, true).apply()
         .anyOrder()
         .build();
     }
@@ -123,14 +132,21 @@ public final class LocalFileTestRun extends BaseYarnTest {
     private static final Logger LOG = LoggerFactory.getLogger(LocalFileSocketServer.class);
 
     @Override
-    public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+    public void handleRequest(BufferedReader reader, PrintWriter writer) throws Exception {
       // Verify there is a gc.log file locally
       Preconditions.checkState(new File("gc.log").exists());
 
+      // Get the footer file from classloader. Since it was added as resources as well, it should be loadable from CL.
+      URL footerURL = getClass().getClassLoader().getResource("footer.txt");
+      Preconditions.checkState(footerURL != null, "Missing footer.txt file from classloader");
+
       LOG.info("handleRequest");
-      String header = Files.toString(new File("header/header.txt"), Charsets.UTF_8);
-      writer.write(header);
+      // Read from the localized file
+      writer.println(Files.readFirstLine(new File("header/header.txt"), Charsets.UTF_8));
+      // Read from the request
       writer.println(reader.readLine());
+      // Read from resource
+      writer.println(Files.readFirstLine(new File(footerURL.toURI()), Charsets.UTF_8));
       LOG.info("Flushed response");
     }
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
new file mode 100644
index 0000000..8ed9ae4
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.Configs;
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.io.LocationCache;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for {@link LocationCache} usage in {@link YarnTwillRunnerService}.
+ */
+public class LocationCacheTest {
+
+  // Create a TwillTester with LocationCache enabled
+  @ClassRule
+  public static final TwillTester TWILL_TESTER = new TwillTester(Configs.Keys.LOCATION_CACHE_DIR, ".cache");
+
+  @Test(timeout = 120000L)
+  public void testLocationCache() throws Exception {
+    TwillRunner twillRunner = TWILL_TESTER.getTwillRunner();
+
+    // Start the runnable
+    TwillController controller = twillRunner.prepare(new BlockingTwillRunnable())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+
+    // Wait until the runnable is runnable
+    String runnableName = BlockingTwillRunnable.class.getSimpleName();
+    ResourceReport resourceReport = controller.getResourceReport();
+    while (resourceReport == null || resourceReport.getRunnableResources(runnableName).isEmpty()) {
+      TimeUnit.SECONDS.sleep(1);
+      resourceReport = controller.getResourceReport();
+    }
+
+    long startTime = System.currentTimeMillis();
+
+    // Inspect the cache directory, there should be a directory, which is the current session
+    // inside that directory, there should be three files, launcher.jar, twill.jar and an application jar
+    LocationFactory locationFactory = TWILL_TESTER.createLocationFactory();
+    Location cacheBase = locationFactory.create(".cache");
+
+    List<Location> cacheDirs = cacheBase.list();
+    Assert.assertEquals(1, cacheDirs.size());
+
+    Location currentSessionCache = cacheDirs.get(0);
+    Assert.assertEquals(3, currentSessionCache.list().size());
+
+    // Force a cleanup of cache. The first call is to collect the locations to be cleanup.
+    // The second call is the actual cleanup.
+    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime);
+    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime +
+                                                                       Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);
+
+    // Since the app is still runnable, no files in the cache should get removed.
+    Assert.assertEquals(3, currentSessionCache.list().size());
+
+    // Stop the app
+    controller.terminate().get();
+
+    // Force a cleanup of cache. The first call is to collect the locations to be cleanup.
+    // The second call is the actual cleanup.
+    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime);
+    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime +
+                                                                       Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);
+
+    // Since the app is stopped, there should only be two files, the launcher.jar and twill.jar, as they
+    // will never get removed for the current session.
+    Set<Location> cachedLocations = new HashSet<>(currentSessionCache.list());
+    Assert.assertEquals(2, cachedLocations.size());
+    Assert.assertTrue(cachedLocations.contains(currentSessionCache.append(Constants.Files.LAUNCHER_JAR)));
+    Assert.assertTrue(cachedLocations.contains(currentSessionCache.append(Constants.Files.TWILL_JAR)));
+
+    // Start another YarnTwillRunnerService
+    TwillRunnerService newTwillRunner = TWILL_TESTER.createTwillRunnerService();
+    newTwillRunner.start();
+
+    // Force a cleanup using the antique expiry. The list of locations that need to be cleanup was already
+    // collected when the new twill runner was started
+    ((YarnTwillRunnerService) newTwillRunner)
+      .forceLocationCacheCleanup(System.currentTimeMillis() + Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
+
+    // Now there shouldn't be any file under the current session cache directory
+    Assert.assertTrue(currentSessionCache.list().isEmpty());
+  }
+
+  /**
+   * A runnable that blocks until stopped explicitly.
+   */
+  public static final class BlockingTwillRunnable extends AbstractTwillRunnable {
+
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    @Override
+    public void run() {
+      Uninterruptibles.awaitUninterruptibly(stopLatch);
+    }
+
+    @Override
+    public void stop() {
+      stopLatch.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
index e9e6a99..d59a15b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
@@ -120,5 +120,5 @@ public abstract class SocketServer extends AbstractTwillRunnable {
     }
   }
 
-  public abstract void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException;
+  public abstract void handleRequest(BufferedReader reader, PrintWriter writer) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index 4a2146e..9daf06c 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -19,9 +19,11 @@ package org.apache.twill.yarn;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Service;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -32,8 +34,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.twill.api.Configs;
+import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.filesystem.FileContextLocationFactory;
+import org.apache.twill.filesystem.LocationFactory;
 import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
 import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnUtils;
@@ -45,7 +51,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A TwillTester rule allows creation of mini Yarn cluster and {@link TwillRunner} used for testing that is
@@ -69,6 +78,8 @@ public class TwillTester extends ExternalResource {
   private static final Logger LOG = LoggerFactory.getLogger(TwillTester.class);
 
   private final TemporaryFolder tmpFolder = new TemporaryFolder();
+  private final Map<String, String> extraConfig;
+
   private InMemoryZKServer zkServer;
   private MiniDFSCluster dfsCluster;
   private MiniYARNCluster cluster;
@@ -76,6 +87,25 @@ public class TwillTester extends ExternalResource {
   private TwillRunnerService twillRunner;
   private YarnAppClient yarnAppClient;
 
+  /**
+   * Creates a new instance with the give list of configurations.
+   *
+   * @param configs list of configuration pairs.
+   *                The list must be in the form of {@code (key1, value1, key2, value2, ...)},
+   *                hence the length of configs must be even.
+   *                The {@link Object#toString()} method will be called to obtain the keys and values that go into
+   *                the configuration.
+   */
+  public TwillTester(Object...configs) {
+    Preconditions.checkArgument(configs.length % 2 == 0,
+                                "Arguments must be in pair form like (k1, v1, k2, v2): %s", Arrays.toString(configs));
+
+    this.extraConfig = new HashMap<>();
+    for (int i = 0; i < configs.length; i += 2) {
+      this.extraConfig.put(configs[i].toString(), configs[i + 1].toString());
+    }
+  }
+
   @Override
   protected void before() throws Throwable {
     tmpFolder.create();
@@ -89,6 +119,11 @@ public class TwillTester extends ExternalResource {
     LOG.info("Starting Mini DFS on path {}", miniDFSDir);
     Configuration fsConf = new HdfsConfiguration(new Configuration());
     fsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, miniDFSDir.getAbsolutePath());
+
+    for (Map.Entry<String, String> entry : extraConfig.entrySet()) {
+      fsConf.set(entry.getKey(), entry.getValue());
+    }
+
     dfsCluster = new MiniDFSCluster.Builder(fsConf).numDataNodes(1).build();
 
     Configuration conf = new YarnConfiguration(dfsCluster.getFileSystem().getConf());
@@ -108,11 +143,13 @@ public class TwillTester extends ExternalResource {
     conf.set("yarn.scheduler.minimum-allocation-mb", "128");
     conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
 
+    conf.set(Configs.Keys.LOCAL_STAGING_DIRECTORY, tmpFolder.newFolder().getAbsolutePath());
+
     cluster = new MiniYARNCluster("test-cluster", 3, 1, 1);
     cluster.init(conf);
     cluster.start();
 
-    this.config = new YarnConfiguration(cluster.getConfig());
+    config = new YarnConfiguration(cluster.getConfig());
 
     twillRunner = createTwillRunnerService();
     twillRunner.start();
@@ -122,6 +159,17 @@ public class TwillTester extends ExternalResource {
 
   @Override
   protected void after() {
+    // Stop all runnable applications
+    for (TwillRunner.LiveInfo info : twillRunner.lookupLive()) {
+      for (TwillController controller : info.getControllers()) {
+        try {
+          controller.terminate().get();
+        } catch (Exception e) {
+          LOG.warn("Exception raised when awaiting termination of {}", info.getApplicationName());
+        }
+      }
+    }
+
     try {
       twillRunner.stop();
     } catch (Exception e) {
@@ -146,13 +194,26 @@ public class TwillTester extends ExternalResource {
    * Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
    */
   public TwillRunnerService createTwillRunnerService() throws IOException {
-    YarnTwillRunnerService runner = new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill");
+    YarnTwillRunnerService runner = new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill",
+                                                               createLocationFactory());
     // disable tests stealing focus
     runner.setJVMOptions("-Djava.awt.headless=true");
     return runner;
   }
 
   /**
+   * Creates a {@link LocationFactory} for the mini YARN cluster.
+   */
+  public LocationFactory createLocationFactory() {
+    try {
+      FileContext fc = FileContext.getFileContext(config);
+      return new FileContextLocationFactory(config, fc, fc.getHomeDirectory().toUri().getPath());
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
    * Returns a {@link TwillRunner} that interact with the mini Yarn cluster.
    */
   public TwillRunner getTwillRunner() {

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/test/resources/header.txt
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/resources/header.txt b/twill-yarn/src/test/resources/header.txt
deleted file mode 100644
index b6e25e6..0000000
--- a/twill-yarn/src/test/resources/header.txt
+++ /dev/null
@@ -1 +0,0 @@
-Local file header


[2/2] twill git commit: (TWILL-63) Speed up application launch time

Posted by ch...@apache.org.
(TWILL-63) Speed up application launch time

The general approach is better jar files management and to cache and reuse jar files created through
class dependency tracing. The changes are further broken down as follows:

1. Refactor jars generation
  - One jar containing the TwillLauncher (launcher.jar), created through dependency tracing.
    - This jar is the same for all applications.
  - One jar containing all twill classes (twill.jar), created through dependency tracing.
    - This jar is the same for all applications.
  - One jar containing the application class, created through dependency tracing.
    - This jar is generated based on the application being launched. It is reusable when launching the same app multiple times.
  - One jar containing user resources setup through TwillPreparer.
    - This jar is not reused between apps.
  - One jar containing runtime config needed by Twill
    - logback.xml, jvm opts, environment, classpaths, ... etc
2. Let YARN to expand jars when localizing to containers instead of expanding it programatically
  - This save time in jar expansion when multiple containers are running on the same host
3. Introduce a new configuration "twill.location.cache.dir" to enable jar caching and reuse
  - Currently only the launcher.jar, twill.jar and application jar will be cached and reuse when possible
  - Cache cleanup logic is also in place to remove files in cache directory that is no longer used by application
4. The ApplicationBundler is improved to allow more flexible usage

This closes #21 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/5986553b
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/5986553b
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/5986553b

Branch: refs/heads/master
Commit: 5986553ba79836140c40ec8b16da7152a0972a2d
Parents: 5edc8dd
Author: Terence Yim <ch...@apache.org>
Authored: Tue Dec 6 17:05:11 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Sat Jan 7 12:56:41 2017 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/twill/api/Configs.java |  99 +++++
 .../apache/twill/internal/DefaultLocalFile.java |  12 +
 .../twill/filesystem/LocationFactories.java     |   2 +-
 .../org/apache/twill/internal/Constants.java    |  16 +-
 .../twill/internal/AbstractTwillService.java    |  31 +-
 .../twill/internal/ApplicationBundler.java      | 160 +++++---
 .../java/org/apache/twill/internal/Configs.java |  56 ---
 .../twill/internal/TwillContainerLauncher.java  |   1 -
 .../twill/internal/io/BasicLocationCache.java   |  52 +++
 .../apache/twill/internal/io/LocationCache.java |  54 +++
 .../internal/io/NoCachingLocationCache.java     |  45 +++
 .../org/apache/twill/internal/utils/Paths.java  |  45 ++-
 .../apache/twill/launcher/TwillLauncher.java    | 181 +++------
 .../apache/twill/internal/DebugOptionsTest.java |   2 +-
 .../org/apache/twill/internal/ServiceMain.java  |   2 +-
 .../ApplicationMasterLiveNodeData.java          |  23 +-
 .../appmaster/ApplicationMasterMain.java        |   2 +-
 .../appmaster/ApplicationMasterService.java     |  38 +-
 .../internal/container/TwillContainerMain.java  |   9 +-
 .../yarn/ApplicationMasterLiveNodeDecoder.java  |  83 ++++
 .../apache/twill/yarn/LocationCacheCleaner.java | 215 +++++++++++
 .../apache/twill/yarn/YarnTwillController.java  |  34 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    | 386 +++++++++++--------
 .../twill/yarn/YarnTwillRunnerService.java      | 157 +++++---
 .../org/apache/twill/yarn/BaseYarnTest.java     |   3 +-
 .../org/apache/twill/yarn/LocalFileTestRun.java |  48 ++-
 .../apache/twill/yarn/LocationCacheTest.java    | 137 +++++++
 .../org/apache/twill/yarn/SocketServer.java     |   2 +-
 .../java/org/apache/twill/yarn/TwillTester.java |  65 +++-
 twill-yarn/src/test/resources/header.txt        |   1 -
 30 files changed, 1440 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-api/src/main/java/org/apache/twill/api/Configs.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java
new file mode 100644
index 0000000..9d195c9
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Defines keys and default values constants being used for configuration.
+ */
+public final class Configs {
+
+  /**
+   * Defines keys being used in configuration.
+   */
+  public static final class Keys {
+    /**
+     * Size in MB of reserved memory for Java process (non-heap memory).
+     */
+    public static final String JAVA_RESERVED_MEMORY_MB = "twill.java.reserved.memory.mb";
+
+    /**
+     * Set this to false to disable the secure store updates done by default.
+     */
+    public static final String SECURE_STORE_UPDATE_LOCATION_ENABLED = "twill.secure.store.update.location.enabled";
+
+    /**
+     * Specifies the local directory for twill to store files generated at runtime.
+     */
+    public static final String LOCAL_STAGING_DIRECTORY = "twill.local.staging.dir";
+
+    /**
+     * Setting caching directory name for location cache.
+     */
+    public static final String LOCATION_CACHE_DIR = "twill.location.cache.dir";
+
+    /**
+     * Setting the expiration time in milliseconds of unused files in the location cache.
+     * The value should be as long as the period when the same application will get launched again.
+     */
+    public static final String LOCATION_CACHE_EXPIRY_MS = "twill.location.cache.expiry.ms";
+
+    /**
+     * Setting the expiration time in milliseconds of unused files created by older runs in the location cache.
+     * The value should be relatively short as those cache files won't get reused after those applications
+     * that are using files completed. This expiry is mainly to workaround the delay that twill detects
+     * the set of all running applications from ZK.
+     */
+    public static final String LOCATION_CACHE_ANTIQUE_EXPIRY_MS = "twill.location.cache.antique.expiry.ms";
+
+    private Keys() {
+    }
+  }
+
+  /**
+   * Defines default configuration values.
+   */
+  public static final class Defaults {
+    /**
+     * Default have 200MB reserved for Java process.
+     */
+    public static final int JAVA_RESERVED_MEMORY_MB = 200;
+
+    /**
+     * Default use the system temp directory for local staging files.
+     */
+    public static final String LOCAL_STAGING_DIRECTORY = System.getProperty("java.io.tmpdir");
+
+    /**
+     * Default expiration is one day for location cache.
+     */
+    public static final long LOCATION_CACHE_EXPIRY_MS = TimeUnit.DAYS.toMillis(1);
+
+    /**
+     * Default expiration is five minutes for location cache created by different twill runner.
+     */
+    public static final long LOCATION_CACHE_ANTIQUE_EXPIRY_MS = TimeUnit.MINUTES.toMillis(5);
+
+    private Defaults() {
+    }
+  }
+
+  private Configs() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-api/src/main/java/org/apache/twill/internal/DefaultLocalFile.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultLocalFile.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultLocalFile.java
index 4fc78e1..7f3cd79 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultLocalFile.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultLocalFile.java
@@ -73,4 +73,16 @@ public final class DefaultLocalFile implements LocalFile {
   public String getPattern() {
     return pattern;
   }
+
+  @Override
+  public String toString() {
+    return "DefaultLocalFile{" +
+      "name='" + name + '\'' +
+      ", uri=" + uri +
+      ", lastModified=" + lastModified +
+      ", size=" + size +
+      ", archive=" + archive +
+      ", pattern='" + pattern + '\'' +
+      '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocationFactories.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
index 3b21f5e..9ee307a 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 
 /**
- * Providers helper methods for creating different {@link LocationFactory}.
+ * Provides helper methods for creating different {@link LocationFactory}.
  */
 public final class LocationFactories {
 

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
index 40b988f..569b396 100644
--- a/twill-common/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -44,9 +44,6 @@ public final class Constants {
 
   public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
 
-  public static final String CLASSPATH = "classpath";
-  public static final String APPLICATION_CLASSPATH = "application-classpath";
-
   /** Command names for the restart runnable instances. */
   public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
   public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
@@ -68,14 +65,21 @@ public final class Constants {
   public static final class Files {
 
     public static final String LAUNCHER_JAR = "launcher.jar";
-    public static final String APP_MASTER_JAR = "appMaster.jar";
-    public static final String CONTAINER_JAR = "container.jar";
+    public static final String TWILL_JAR = "twill.jar";
+    public static final String APPLICATION_JAR = "application.jar";
+    public static final String RESOURCES_JAR = "resources.jar";
+    public static final String RUNTIME_CONFIG_JAR = "runtime.config.jar";
+
     public static final String LOCALIZE_FILES = "localizeFiles.json";
     public static final String TWILL_SPEC = "twillSpec.json";
     public static final String ARGUMENTS = "arguments.json";
     public static final String ENVIRONMENTS = "environments.json";
     public static final String LOGBACK_TEMPLATE = "logback-template.xml";
-    public static final String JVM_OPTIONS = "jvm.opts";
+    public static final String JVM_OPTIONS = "jvm.opts.json";
+
+    public static final String CLASSPATH = "classpath.txt";
+    public static final String APPLICATION_CLASSPATH = "application-classpath.txt";
+
     public static final String CREDENTIALS = "credentials.store";
     public static final String LOG_LEVELS = "logLevel.json";
 

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index fc5b907..8e73653 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -127,6 +127,13 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
   }
 
   /**
+   * Returns a {@link Gson} instance for serializing object returned by the {@link #getLiveNodeData()} method.
+   */
+  protected Gson getLiveNodeGson() {
+    return GSON;
+  }
+
+  /**
    * Handles message by simply logging it. Child class should override this method for custom handling of message.
    *
    * @see org.apache.twill.internal.state.MessageCallback
@@ -204,16 +211,16 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
    * @return A {@link OperationFuture} that will be completed when the update is done.
    */
   protected final OperationFuture<?> updateLiveNode() {
-    String liveNode = getLiveNodePath();
-    LOG.info("Update live node {}{}", zkClient.getConnectString(), liveNode);
-    return zkClient.setData(liveNode, toJson(getLiveNodeJsonObject()));
+    String liveNodePath = getLiveNodePath();
+    LOG.info("Update live node {}{}", zkClient.getConnectString(), liveNodePath);
+    return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
   private OperationFuture<String> createLiveNode() {
-    String liveNode = getLiveNodePath();
-    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
-    return ZKOperations.ignoreError(zkClient.create(liveNode, toJson(getLiveNodeJsonObject()), CreateMode.EPHEMERAL),
-                                    KeeperException.NodeExistsException.class, liveNode);
+    String liveNodePath = getLiveNodePath();
+    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
+    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
+                                    KeeperException.NodeExistsException.class, liveNodePath);
   }
 
   private OperationFuture<String> removeLiveNode() {
@@ -374,16 +381,12 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
     return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
   }
 
-  private JsonObject getLiveNodeJsonObject() {
+  private byte[] serializeLiveNode() {
     JsonObject content = new JsonObject();
     Object liveNodeData = getLiveNodeData();
     if (liveNodeData != null) {
-      content.add("data", GSON.toJsonTree(liveNodeData));
+      content.add("data", getLiveNodeGson().toJsonTree(liveNodeData));
     }
-    return content;
-  }
-
-  private <T> byte[] toJson(T obj) {
-    return GSON.toJson(obj).getBytes(Charsets.UTF_8);
+    return GSON.toJson(content).getBytes(Charsets.UTF_8);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
index c5beaca..8f82dbd 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
-import com.google.common.io.Closeables;
 import com.google.common.io.Files;
 import org.apache.twill.api.ClassAcceptor;
 import org.apache.twill.filesystem.Location;
@@ -58,37 +57,17 @@ public final class ApplicationBundler {
 
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationBundler.class);
 
-  public static final String SUBDIR_CLASSES = "classes/";
-  public static final String SUBDIR_LIB = "lib/";
-  public static final String SUBDIR_RESOURCES = "resources/";
-
   private final ClassAcceptor classAcceptor;
   private final Set<String> bootstrapClassPaths;
   private final CRC32 crc32;
 
-  /**
-   * Constructs a ApplicationBundler.
-   *
-   * @param classAcceptor ClassAcceptor for class packages to include
-   */
-  public ApplicationBundler(ClassAcceptor classAcceptor) {
-    this.classAcceptor = classAcceptor;
-    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
-    for (String classpath : Splitter.on(File.pathSeparatorChar).split(System.getProperty("sun.boot.class.path"))) {
-      File file = new File(classpath);
-      builder.add(file.getAbsolutePath());
-      try {
-        builder.add(file.getCanonicalPath());
-      } catch (IOException e) {
-        // Ignore the exception and proceed.
-      }
-    }
-    this.bootstrapClassPaths = builder.build();
-    this.crc32 = new CRC32();
-  }
+  private File tempDir;
+  private String classesDir;
+  private String libDir;
+  private String resourcesDir;
 
   /**
-   * Constructs a ApplicationBundler.
+   * Constructs an ApplicationBundler.
    *
    * @param excludePackages Class packages to exclude
    */
@@ -97,7 +76,7 @@ public final class ApplicationBundler {
   }
 
   /**
-   * Constructs a ApplicationBundler.
+   * Constructs an ApplicationBundler.
    *
    * @param excludePackages Class packages to exclude
    * @param includePackages Class packages that should be included. Anything in this list will override the
@@ -122,6 +101,82 @@ public final class ApplicationBundler {
     });
   }
 
+  /**
+   * Constructs an ApplicationBundler.
+   *
+   * @param classAcceptor ClassAcceptor for class packages to include
+   */
+  public ApplicationBundler(ClassAcceptor classAcceptor) {
+    this.classAcceptor = classAcceptor;
+    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+    for (String classpath : Splitter.on(File.pathSeparatorChar).split(System.getProperty("sun.boot.class.path"))) {
+      File file = new File(classpath);
+      builder.add(file.getAbsolutePath());
+      try {
+        builder.add(file.getCanonicalPath());
+      } catch (IOException e) {
+        // Ignore the exception and proceed.
+      }
+    }
+    this.bootstrapClassPaths = builder.build();
+    this.crc32 = new CRC32();
+    this.tempDir = new File(System.getProperty("java.io.tmpdir"));
+    this.classesDir = "classes/";
+    this.libDir = "lib/";
+    this.resourcesDir = "resources/";
+  }
+
+  /**
+   * Sets the temporary directory used by this class when generating new jars.
+   * By default it is using the {@code java.io.tmpdir} property.
+   */
+  public ApplicationBundler setTempDir(File tempDir) {
+    if (tempDir == null) {
+      throw new IllegalArgumentException("Temporary directory cannot be null");
+    }
+    this.tempDir = tempDir;
+    return this;
+  }
+
+  /**
+   * Sets the name of the directory inside the bundle jar that all ".class" files stored in.
+   * Passing in an empty string will store files at the root level inside the jar file.
+   * By default it is "classes".
+   */
+  public ApplicationBundler setClassesDir(String classesDir) {
+    if (classesDir == null) {
+      throw new IllegalArgumentException("Directory cannot be null");
+    }
+    this.classesDir = classesDir.endsWith("/") ? classesDir : classesDir + "/";
+    return this;
+  }
+
+  /**
+   * Sets the name of the directory inside the bundle jar that all ".jar" files stored in.
+   * Passing in an empty string will store files at the root level inside the jar file.
+   * By default it is "lib".
+   */
+  public ApplicationBundler setLibDir(String libDir) {
+    if (classesDir == null) {
+      throw new IllegalArgumentException("Directory cannot be null");
+    }
+    this.libDir = libDir.endsWith("/") ? libDir : libDir + "/";
+    return this;
+  }
+
+  /**
+   * Sets the name of the directory inside the bundle jar that all resource files stored in.
+   * Passing in an empty string will store files at the root level inside the jar file.
+   * By default it is "resources".
+   */
+  public ApplicationBundler setResourcesDir(String resourcesDir) {
+    if (classesDir == null) {
+      throw new IllegalArgumentException("Directory cannot be null");
+    }
+    this.resourcesDir = resourcesDir.endsWith("/") ? resourcesDir : resourcesDir + "/";
+    return this;
+  }
+
   public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException {
     createBundle(target, classes, ImmutableList.<URI>of());
   }
@@ -142,12 +197,13 @@ public final class ApplicationBundler {
    * @param resources Extra resources to put into the jar file. If resource is a jar file, it'll be put under
    *                  lib/ entry, otherwise under the resources/ entry.
    * @param classes Set of classes to start the dependency traversal.
-   * @throws IOException
+   * @throws IOException if failed to create the bundle
    */
   public void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException {
-    LOG.debug("start creating bundle {}. building a temporary file locally at first", target.getName());
+    LOG.debug("Start creating bundle at {}", target);
     // Write the jar to local tmp file first
-    File tmpJar = File.createTempFile(target.getName(), ".tmp");
+    File tmpJar = File.createTempFile(target.getName(), ".tmp", tempDir);
+    LOG.debug("First create bundle locally at {}", tmpJar);
     try {
       Set<String> entries = Sets.newHashSet();
       try (JarOutputStream jarOut = new JarOutputStream(new FileOutputStream(tmpJar))) {
@@ -159,22 +215,20 @@ public final class ApplicationBundler {
           copyResource(resource, entries, jarOut);
         }
       }
-      LOG.debug("copying temporary bundle to destination {} ({} bytes)", target, tmpJar.length());
+      LOG.debug("Copying temporary bundle to destination {} ({} bytes)", target, tmpJar.length());
       // Copy the tmp jar into destination.
-      try {
-        OutputStream os = new BufferedOutputStream(target.getOutputStream());
-        try {
-          Files.copy(tmpJar, os);
-        } finally {
-          Closeables.closeQuietly(os);
-        }
+      try (OutputStream os = new BufferedOutputStream(target.getOutputStream())) {
+        Files.copy(tmpJar, os);
       } catch (IOException e) {
-        throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target, e);
+        throw new IOException("Failed to copy bundle from " + tmpJar + " to " + target, e);
       }
-      LOG.debug("finished creating bundle at {}", target);
+      LOG.debug("Finished creating bundle at {}", target);
     } finally {
-      tmpJar.delete();
-      LOG.debug("cleaned up local temporary for bundle {}", tmpJar.toURI());
+      if (!tmpJar.delete()) {
+        LOG.warn("Failed to cleanup local temporary file {}", tmpJar);
+      } else {
+        LOG.debug("Cleaned up local temporary file {}", tmpJar);
+      }
     }
   }
 
@@ -219,29 +273,29 @@ public final class ApplicationBundler {
       /* need unique name or else we lose classes (TWILL-181) we know the classPath is unique because it is
        * coming from a set, preserve as much as possible of it by prepending elements of the path until it is
        * unique. */
-      if (entries.contains(SUBDIR_LIB + entryName)) {
+      if (entries.contains(libDir + entryName)) {
         String[] parts = classPath.split("/");
         for (int i = parts.length - 2; i >= 0; i--) {
           entryName = parts[i] + "-" + entryName;
-          if (!entries.contains(SUBDIR_LIB + entryName)) {
+          if (!entries.contains(libDir + entryName)) {
             break;
           }
         }
       }
-      saveDirEntry(SUBDIR_LIB, entries, jarOut);
-      saveEntry(SUBDIR_LIB + entryName, classPathUrl, entries, jarOut, false);
+      saveDirEntry(libDir, entries, jarOut);
+      saveEntry(libDir + entryName, classPathUrl, entries, jarOut, false);
     } else {
       // Class file, put it under the classes directory
-      saveDirEntry(SUBDIR_CLASSES, entries, jarOut);
+      saveDirEntry(classesDir, entries, jarOut);
       if ("file".equals(classPathUrl.getProtocol())) {
         // Copy every files under the classPath
         try {
-          copyDir(new File(classPathUrl.toURI()), SUBDIR_CLASSES, entries, jarOut);
+          copyDir(new File(classPathUrl.toURI()), classesDir, entries, jarOut);
         } catch (Exception e) {
           throw Throwables.propagate(e);
         }
       } else {
-        String entry = SUBDIR_CLASSES + className.replace('.', '/') + ".class";
+        String entry = classesDir + className.replace('.', '/') + ".class";
         saveDirEntry(entry.substring(0, entry.lastIndexOf('/') + 1), entries, jarOut);
         saveEntry(entry, classUrl, entries, jarOut, true);
       }
@@ -349,15 +403,15 @@ public final class ApplicationBundler {
     if ("file".equals(resource.getScheme())) {
       File file = new File(resource);
       if (file.isDirectory()) {
-        saveDirEntry(SUBDIR_RESOURCES, entries, jarOut);
-        copyDir(file, SUBDIR_RESOURCES, entries, jarOut);
+        saveDirEntry(resourcesDir, entries, jarOut);
+        copyDir(file, resourcesDir, entries, jarOut);
         return;
       }
     }
 
     URL url = resource.toURL();
     String path = url.getFile();
-    String prefix = path.endsWith(".jar") ? SUBDIR_LIB : SUBDIR_RESOURCES;
+    String prefix = path.endsWith(".jar") ? libDir : resourcesDir;
     path = prefix + path.substring(path.lastIndexOf('/') + 1);
 
     if (entries.add(path)) {
@@ -371,7 +425,7 @@ public final class ApplicationBundler {
 
   private static final class TransferByteOutputStream extends ByteArrayOutputStream {
 
-    public void transfer(OutputStream os) throws IOException {
+    void transfer(OutputStream os) throws IOException {
       os.write(buf, 0, count);
     }
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/Configs.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Configs.java b/twill-core/src/main/java/org/apache/twill/internal/Configs.java
deleted file mode 100644
index 5e7033e..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/Configs.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * Defines keys and default values constants being used for configuration.
- */
-public final class Configs {
-
-  /**
-   * Defines keys being used in configuration.
-   */
-  public static final class Keys {
-    /**
-     * Size in MB of reserved memory for Java process (non-heap memory).
-     */
-    public static final String JAVA_RESERVED_MEMORY_MB = "twill.java.reserved.memory.mb";
-
-    /**
-     * Set this to false to disable the secure store updates done by default.
-     */
-    public static final String SECURE_STORE_UPDATE_LOCATION_ENABLED = "twill.secure.store.update.location.enabled";
-
-    private Keys() {
-    }
-  }
-
-  /**
-   * Defines default configuration values.
-   */
-  public static final class Defaults {
-    // By default have 200MB reserved for Java process.
-    public static final int JAVA_RESERVED_MEMORY_MB = 200;
-
-    private Defaults() {
-    }
-  }
-
-  private Configs() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index ff8ddb2..b77a856 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -152,7 +152,6 @@ public final class TwillContainerLauncher {
       commandBuilder.add(jvmOpts.getExtraOptions());
     }
     commandBuilder.add(TwillLauncher.class.getName(),
-                       Constants.Files.CONTAINER_JAR,
                        mainClass.getName(),
                        Boolean.TRUE.toString());
     List<String> command = commandBuilder.build();

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/io/BasicLocationCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/io/BasicLocationCache.java b/twill-core/src/main/java/org/apache/twill/internal/io/BasicLocationCache.java
new file mode 100644
index 0000000..030bcde
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/io/BasicLocationCache.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.io;
+
+import org.apache.twill.filesystem.Location;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A straightforward implementation of {@link LocationCache} that simply use location existence as the cache
+ * indicator.
+ */
+public class BasicLocationCache implements LocationCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BasicLocationCache.class);
+
+  private final Location cacheDir;
+
+  public BasicLocationCache(Location cacheDir) {
+    this.cacheDir = cacheDir;
+  }
+
+  @Override
+  public synchronized Location get(String name, Loader loader) throws IOException {
+    Location location = cacheDir.append(name);
+    if (location.exists()) {
+      LOG.debug("Cache hit for {} in {}", name, location);
+      return location;
+    }
+
+    LOG.debug("Cache miss for {}. Use Loader to save to {}", name, location);
+    loader.load(name, location);
+    return location;
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/io/LocationCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/io/LocationCache.java b/twill-core/src/main/java/org/apache/twill/internal/io/LocationCache.java
new file mode 100644
index 0000000..41b6aa0
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/io/LocationCache.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.io;
+
+import org.apache.twill.filesystem.Location;
+
+import java.io.IOException;
+
+/**
+ * Defines caching of {@link Location}.
+ */
+public interface LocationCache {
+
+  /**
+   * Gets the {@link Location} represented by the given name. If there is no such name exists in the cache,
+   * use the given {@link Loader} to populate the cache.
+   *
+   * @param name name of the caching entry
+   * @param loader the {@link Loader} for populating the content for the cache entry
+   * @return A {@link Location} associated with the cache name
+   * @throws IOException If failed to load the {@link Location}.
+   */
+  Location get(String name, Loader loader) throws IOException;
+
+  /**
+   * A loader to load the content of the given name.
+   */
+  abstract class Loader {
+
+    /**
+     * Loads content into the given target {@link Location}.
+     *
+     * @param name name of the cache entry
+     * @param targetLocation the target {@link Location} to store the content.
+     * @throws IOException If failed to populate the content.
+     */
+    public abstract void load(String name, Location targetLocation) throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/io/NoCachingLocationCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/io/NoCachingLocationCache.java b/twill-core/src/main/java/org/apache/twill/internal/io/NoCachingLocationCache.java
new file mode 100644
index 0000000..e443252
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/io/NoCachingLocationCache.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.io;
+
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.utils.Paths;
+
+import java.io.IOException;
+
+/**
+ * A implementation of {@link LocationCache} that never cache any content.
+ * It always invokes {@link Loader#load(String, Location)} to load the content.
+ */
+public class NoCachingLocationCache implements LocationCache {
+
+  private final Location baseDir;
+
+  public NoCachingLocationCache(Location baseDir) {
+    this.baseDir = baseDir;
+  }
+
+  @Override
+  public Location get(String name, Loader loader) throws IOException {
+    String suffix = Paths.getExtension(name);
+    String prefix = name.substring(0, name.length() - suffix.length() - 1);
+    Location targetLocation = baseDir.append(prefix).getTempFile('.' + suffix);
+    loader.load(name, targetLocation);
+    return targetLocation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
index aeee09f..11c2fdf 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Paths.java
@@ -17,15 +17,25 @@
  */
 package org.apache.twill.internal.utils;
 
-import com.google.common.io.Files;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 
 /**
- *
+ * A utility class for manipulating paths.
  */
 public final class Paths {
 
-
-  public static String appendSuffix(String extractFrom, String appendTo) {
+  /**
+   * Returns a new path by appending an extension.
+   *
+   * @param extractFrom the path to extract the extension from
+   * @param appendTo the path to append the extension to
+   */
+  public static String addExtension(String extractFrom, String appendTo) {
     String suffix = getExtension(extractFrom);
     if (!suffix.isEmpty()) {
       return appendTo + '.' + suffix;
@@ -33,12 +43,37 @@ public final class Paths {
     return appendTo;
   }
 
+  /**
+   * Returns the file extension of the given file path. The file extension is defined by the suffix after the last
+   * dot character {@code .}. If there is no dot, an empty string will be returned.
+   */
   public static String getExtension(String path) {
     if (path.endsWith(".tar.gz")) {
       return "tar.gz";
     }
 
-    return Files.getFileExtension(path);
+    int idx = path.lastIndexOf('.');
+    return (idx >= 0) ? path.substring(idx + 1) : "";
+  }
+
+  /**
+   * Deletes the given path. If the path represents a directory, the content of it will be emptied recursively,
+   * followed by the removal of the directory itself.
+   */
+  public static void deleteRecursively(Path path) throws IOException {
+    Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
+      @Override
+      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+        Files.delete(file);
+        return FileVisitResult.CONTINUE;
+      }
+
+      @Override
+      public FileVisitResult postVisitDirectory(Path dir, IOException ex) throws IOException {
+        Files.delete(dir);
+        return FileVisitResult.CONTINUE;
+      }
+    });
   }
 
   private Paths() {

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
index 171b22a..8287947 100644
--- a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
@@ -19,28 +19,25 @@ package org.apache.twill.launcher;
 
 import org.apache.twill.internal.Constants;
 
-import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
 
 /**
  * A launcher for application from a archive jar.
@@ -49,42 +46,30 @@ import java.util.jar.JarInputStream;
  */
 public final class TwillLauncher {
 
-  private static final int TEMP_DIR_ATTEMPTS = 20;
-
   /**
    * Main method to unpackage a jar and run the mainClass.main() method.
-   * @param args args[0] is the path to jar file, args[1] is the class name of the mainClass.
-   *             The rest of args will be passed the mainClass unmodified.
+   * @param args args[0] is the class name of the mainClass, args[1] is a boolean, telling whether to append classpath
+   *             from the "classpath.txt" runtime config jar or not. The rest of args are arguments to the mainClass.
    */
   public static void main(String[] args) throws Exception {
-    if (args.length < 3) {
-      System.out.println("Usage: java " + TwillLauncher.class.getName() + " [jarFile] [mainClass] [use_classpath]");
+    if (args.length < 2) {
+      System.out.println("Usage: java " + TwillLauncher.class.getName() + " [mainClass] [use_classpath] [args...]");
       return;
     }
 
-    File file = new File(args[0]);
-    final File targetDir = createTempDir("twill.launcher");
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        System.out.println("Cleanup directory " + targetDir);
-        deleteDir(targetDir);
-      }
-    });
-
-    System.out.println("UnJar " + file + " to " + targetDir);
-    unJar(file, targetDir);
+    String mainClassName = args[0];
+    boolean userClassPath = Boolean.parseBoolean(args[1]);
 
     // Create ClassLoader
-    URLClassLoader classLoader = createClassLoader(targetDir, Boolean.parseBoolean(args[2]));
+    URLClassLoader classLoader = createClassLoader(userClassPath);
     Thread.currentThread().setContextClassLoader(classLoader);
 
-    System.out.println("Launch class (" + args[1] + ") with classpath: " + Arrays.toString(classLoader.getURLs()));
+    System.out.println("Launch class (" + mainClassName + ") with classpath: " +
+                         Arrays.toString(classLoader.getURLs()));
 
-    Class<?> mainClass = classLoader.loadClass(args[1]);
+    Class<?> mainClass = classLoader.loadClass(mainClassName);
     Method mainMethod = mainClass.getMethod("main", String[].class);
-    String[] arguments = Arrays.copyOfRange(args, 3, args.length);
+    String[] arguments = Arrays.copyOfRange(args, 2, args.length);
     System.out.println("Launching main: " + mainMethod + " " + Arrays.toString(arguments));
     mainMethod.invoke(mainClass, new Object[]{arguments});
     System.out.println("Main class completed.");
@@ -92,92 +77,50 @@ public final class TwillLauncher {
     System.out.println("Launcher completed");
   }
 
-  /**
-   * This method is copied from Guava Files.createTempDir().
-   */
-  private static File createTempDir(String prefix) throws IOException {
-    File baseDir = new File(System.getProperty("java.io.tmpdir"));
-    if (!baseDir.isDirectory() && !baseDir.mkdirs()) {
-      throw new IOException("Tmp directory not exists: " + baseDir.getAbsolutePath());
-    }
+  private static URLClassLoader createClassLoader(boolean useClassPath) throws Exception {
+    List<URL> urls = new ArrayList<>();
 
-    String baseName = prefix + "-" + System.currentTimeMillis() + "-";
+    File appJarDir = new File(Constants.Files.APPLICATION_JAR);
+    File resourceJarDir = new File(Constants.Files.RESOURCES_JAR);
+    File twillJarDir = new File(Constants.Files.TWILL_JAR);
 
-    for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++) {
-      File tempDir = new File(baseDir, baseName + counter);
-      if (tempDir.mkdir()) {
-        return tempDir;
-      }
-    }
-    throw new IOException("Failed to create directory within "
-                            + TEMP_DIR_ATTEMPTS + " attempts (tried "
-                            + baseName + "0 to " + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')');
-  }
-
-  private static void unJar(File jarFile, File targetDir) throws IOException {
-    try (JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile))) {
-      JarEntry jarEntry = jarInput.getNextJarEntry();
-      while (jarEntry != null) {
-        File target = new File(targetDir, jarEntry.getName());
-        if (jarEntry.isDirectory()) {
-          target.mkdirs();
-        } else {
-          target.getParentFile().mkdirs();
-          copy(jarInput, target);
-        }
-        jarEntry = jarInput.getNextJarEntry();
-      }
-    }
-  }
-
-  private static void copy(InputStream is, File file) throws IOException {
-    byte[] buf = new byte[8192];
-    try (OutputStream os = new BufferedOutputStream(new FileOutputStream(file))) {
-      int len = is.read(buf);
-      while (len != -1) {
-        os.write(buf, 0, len);
-        len = is.read(buf);
+    // For backward compatibility, sort jars from twill and jars from application together
+    // With TWILL-179, this will change as the user can have control on how it should be.
+    List<File> libJarFiles = listJarFiles(new File(appJarDir, "lib"), new ArrayList<File>());
+    Collections.sort(listJarFiles(new File(twillJarDir, "lib"), libJarFiles), new Comparator<File>() {
+      @Override
+      public int compare(File file1, File file2) {
+        // order by the file name only. If the name are the same, the one in application jar will prevail.
+        return file1.getName().compareTo(file2.getName());
       }
-    }
-  }
+    });
 
-  private static URLClassLoader createClassLoader(File dir, boolean useClassPath) {
-    try {
-      List<URL> urls = new ArrayList<>();
+    // Add the app jar, resources jar and twill jar directories to the classpath as well
+    for (File dir : Arrays.asList(appJarDir, resourceJarDir, twillJarDir)) {
       urls.add(dir.toURI().toURL());
       urls.add(new File(dir, "classes").toURI().toURL());
       urls.add(new File(dir, "resources").toURI().toURL());
+    }
 
-      File libDir = new File(dir, "lib");
-      for (File file : listFiles(libDir)) {
-        if (file.getName().endsWith(".jar")) {
-          urls.add(file.toURI().toURL());
-        }
-      }
-
-      if (useClassPath) {
-        addClassPathsToList(urls, Constants.CLASSPATH);
-      }
-
-      addClassPathsToList(urls, Constants.APPLICATION_CLASSPATH);
-
-      return new URLClassLoader(urls.toArray(new URL[urls.size()]));
+    // Add all lib jars
+    for (File jarFile : libJarFiles) {
+      urls.add(jarFile.toURI().toURL());
+    }
 
-    } catch (Exception e) {
-      throw new IllegalStateException(e);
+    if (useClassPath) {
+      addClassPathsToList(urls, new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.CLASSPATH));
     }
+
+    addClassPathsToList(urls, new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.APPLICATION_CLASSPATH));
+    return new URLClassLoader(urls.toArray(new URL[urls.size()]));
   }
 
-  private static void addClassPathsToList(List<URL> urls, String resource) throws IOException {
-    try (InputStream is = ClassLoader.getSystemResourceAsStream(resource)) {
-      if (is != null) {
-        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")))) {
-          String line = reader.readLine();
-          if (line != null) {
-            for (String path : line.split(":")) {
-              urls.addAll(getClassPaths(path.trim()));
-            }
-          }
+  private static void addClassPathsToList(List<URL> urls, File classpathFile) throws IOException {
+    try (BufferedReader reader = Files.newBufferedReader(classpathFile.toPath(), StandardCharsets.UTF_8)) {
+      String line = reader.readLine();
+      if (line != null) {
+        for (String path : line.split(":")) {
+          urls.addAll(getClassPaths(path.trim()));
         }
       }
     }
@@ -188,7 +131,8 @@ public final class TwillLauncher {
     if (classpath.endsWith("/*")) {
       // Grab all .jar files
       File dir = new File(classpath.substring(0, classpath.length() - 2));
-      List<File> files = listFiles(dir);
+      List<File> files = listJarFiles(dir, new ArrayList<File>());
+      Collections.sort(files);
       if (files.isEmpty()) {
         return singleItem(dir.toURI().toURL());
       }
@@ -220,29 +164,20 @@ public final class TwillLauncher {
     return result;
   }
 
-  private static void deleteDir(File dir) {
-    File[] files = dir.listFiles();
-    if (files == null || files.length == 0) {
-      dir.delete();
-      return;
-    }
-    for (File file : files) {
-      deleteDir(file);
-    }
-    dir.delete();
-  }
-
   /**
-   * Returns a sorted list of {@link File} under the given directory. The list will be empty if
-   * the given directory is empty, not exist or not a directory.
+   * Populates a list of {@link File} under the given directory that has ".jar" as extension.
    */
-  private static List<File> listFiles(File dir) {
+  private static List<File> listJarFiles(File dir, List<File> result) {
+    System.out.println("listing jars for " + dir.getAbsolutePath());
     File[] files = dir.listFiles();
     if (files == null || files.length == 0) {
-      return Collections.emptyList();
+      return result;
     }
-    List<File> fileList = Arrays.asList(files);
-    Collections.sort(fileList);
-    return fileList;
+    for (File file : files) {
+      if (file.getName().endsWith(".jar")) {
+        result.add(file);
+      }
+    }
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-core/src/test/java/org/apache/twill/internal/DebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/DebugOptionsTest.java b/twill-core/src/test/java/org/apache/twill/internal/DebugOptionsTest.java
index 2a07987..df96973 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/DebugOptionsTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/DebugOptionsTest.java
@@ -24,7 +24,7 @@ import org.junit.Test;
 import java.util.Set;
 
 /**
- * Unit test for {@link org.apache.twill.internal.JvmOptions.DebugOptions} class
+ * Unit test for {@link org.apache.twill.internal.JvmOptions.DebugOptions} class.
  */
 public class DebugOptionsTest {
 

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index 5065d07..91256c8 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -186,7 +186,7 @@ public abstract class ServiceMain {
     configurator.setContext(context);
 
     try {
-      File twillLogback = new File(Constants.Files.LOGBACK_TEMPLATE);
+      File twillLogback = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.LOGBACK_TEMPLATE);
       if (twillLogback.exists()) {
         configurator.doConfigure(twillLogback);
       }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
index 028df7b..dd4d946 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
@@ -17,6 +17,10 @@
  */
 package org.apache.twill.internal.appmaster;
 
+import org.apache.twill.api.LocalFile;
+
+import java.util.List;
+
 /**
  * Represents data being stored in the live node of the application master.
  */
@@ -25,11 +29,14 @@ public final class ApplicationMasterLiveNodeData {
   private final int appId;
   private final long appIdClusterTime;
   private final String containerId;
+  private final List<LocalFile> localFiles;
 
-  public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime, String containerId) {
+  public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime,
+                                       String containerId, List<LocalFile> localFiles) {
     this.appId = appId;
     this.appIdClusterTime = appIdClusterTime;
     this.containerId = containerId;
+    this.localFiles = localFiles;
   }
 
   public int getAppId() {
@@ -43,4 +50,18 @@ public final class ApplicationMasterLiveNodeData {
   public String getContainerId() {
     return containerId;
   }
+
+  public List<LocalFile> getLocalFiles() {
+    return localFiles;
+  }
+
+  @Override
+  public String toString() {
+    return "ApplicationMasterLiveNodeData{" +
+      "appId=" + appId +
+      ", appIdClusterTime=" + appIdClusterTime +
+      ", containerId='" + containerId + '\'' +
+      ", localFiles=" + localFiles +
+      '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 3f68bc3..8dd5046 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -65,7 +65,7 @@ public final class ApplicationMasterMain extends ServiceMain {
    * Starts the application master.
    */
   public static void main(String[] args) throws Exception {
-    File twillSpec = new File(Constants.Files.TWILL_SPEC);
+    File twillSpec = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
     TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec);
     String zkConnect = twillRuntimeSpec.getZkConnectStr();
     RunId runId = twillRuntimeSpec.getTwillAppRunId();

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index c1c3986..4aa3800 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -17,7 +17,6 @@
  */
 package org.apache.twill.internal.appmaster;
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
@@ -32,7 +31,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Ranges;
 import com.google.common.collect.Sets;
-import com.google.common.io.Files;
 import com.google.common.io.InputSupplier;
 import com.google.common.reflect.TypeToken;
 import com.google.common.util.concurrent.Futures;
@@ -88,6 +86,10 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -109,7 +111,10 @@ import javax.annotation.Nullable;
 public final class ApplicationMasterService extends AbstractYarnTwillService implements Supplier<ResourceReport> {
 
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
-  private static final Gson GSON = new Gson();
+  private static final Gson GSON = new GsonBuilder()
+    .serializeNulls()
+    .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+    .create();
 
   // Copied from org.apache.hadoop.yarn.security.AMRMTokenIdentifier.KIND_NAME since it's missing in Hadoop-2.0
   private static final Text AMRM_TOKEN_KIND_NAME = new Text("YARN_AM_RM_TOKEN");
@@ -151,15 +156,15 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
 
     this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
                                                         Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
-                                                        amClient.getContainerId().toString());
+                                                        amClient.getContainerId().toString(), getLocalizeFiles());
 
-    expectedContainers = initExpectedContainers(twillSpec);
-    runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
-    eventHandler = createEventHandler(twillSpec);
+    this.expectedContainers = initExpectedContainers(twillSpec);
+    this.runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
+    this.eventHandler = createEventHandler(twillSpec);
   }
 
   private JvmOptions loadJvmOptions() throws IOException {
-    final File jvmOptsFile = new File(Constants.Files.JVM_OPTIONS);
+    final File jvmOptsFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.JVM_OPTIONS);
     if (!jvmOptsFile.exists()) {
       return new JvmOptions(null, JvmOptions.DebugOptions.NO_DEBUG);
     }
@@ -302,6 +307,11 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   }
 
   @Override
+  protected Gson getLiveNodeGson() {
+    return GSON;
+  }
+
+  @Override
   public ListenableFuture<String> onReceived(String messageId, Message message) {
     LOG.debug("Message received: {} {}.", messageId, message);
 
@@ -667,9 +677,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
       // Override with system env
       env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
 
-      ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
+      ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env,
+                                                                                         amLiveNode.getLocalFiles(),
                                                                                          credentials);
-
       TwillContainerLauncher launcher = new TwillContainerLauncher(
         twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
@@ -693,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   }
 
   private List<LocalFile> getLocalizeFiles() {
-    try (Reader reader = Files.newReader(new File(Constants.Files.LOCALIZE_FILES), Charsets.UTF_8)) {
+    try (Reader reader = Files.newBufferedReader(Paths.get(Constants.Files.LOCALIZE_FILES), StandardCharsets.UTF_8)) {
       return new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
         .create().fromJson(reader, new TypeToken<List<LocalFile>>() {
         }.getType());
@@ -703,12 +713,12 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
   }
 
   private Map<String, Map<String, String>> getEnvironments() {
-    File envFile = new File(Constants.Files.ENVIRONMENTS);
-    if (!envFile.exists()) {
+    Path envFile = Paths.get(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.ENVIRONMENTS);
+    if (!Files.exists(envFile)) {
       return new HashMap<>();
     }
 
-    try (Reader reader = Files.newReader(envFile, Charsets.UTF_8)) {
+    try (Reader reader = Files.newBufferedReader(envFile, StandardCharsets.UTF_8)) {
       return new Gson().fromJson(reader, new TypeToken<Map<String, Map<String, String>>>() {
       }.getType());
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 1f8d689..e37cd44 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -70,13 +70,13 @@ public final class TwillContainerMain extends ServiceMain {
    * a {@link org.apache.twill.api.TwillRunnable}.
    */
   public static void main(String[] args) throws Exception {
-    new TwillContainerMain().doMain(args);
+    new TwillContainerMain().doMain();
   }
 
-  private void doMain(String[] args) throws Exception {
+  private void doMain() throws Exception {
     // Try to load the secure store from localized file, which AM requested RM to localize it for this container.
     loadSecureStore();
-    File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
+    File twillSpecFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
     TwillRuntimeSpecification twillRuntimeSpec = loadTwillSpec(twillSpecFile);
     String zkConnectStr = twillRuntimeSpec.getZkConnectStr();
     RunId appRunId = twillRuntimeSpec.getTwillAppRunId();
@@ -183,7 +183,8 @@ public final class TwillContainerMain extends ServiceMain {
   }
 
   private static Arguments decodeArgs() throws IOException {
-    return ArgumentsCodec.decode(Files.newReaderSupplier(new File(Constants.Files.ARGUMENTS), Charsets.UTF_8));
+    return ArgumentsCodec.decode(
+      Files.newReaderSupplier(new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.ARGUMENTS), Charsets.UTF_8));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java
new file mode 100644
index 0000000..4b2fc42
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ApplicationMasterLiveNodeDecoder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.base.Charsets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
+import org.apache.twill.internal.json.LocalFileCodec;
+import org.apache.twill.zookeeper.NodeData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A package local class to help decoding {@link NodeData} to {@link ApplicationMasterLiveNodeDecoder}.
+ * This is only used to have a central for the decoding logic, that is shared between {@link YarnTwillRunnerService}
+ * and {@link YarnTwillController}.
+ */
+final class ApplicationMasterLiveNodeDecoder {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterLiveNodeDecoder.class);
+  private static final Gson GSON = new GsonBuilder()
+    .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+    .create();
+
+
+  /**
+   * Decodes the {@link ApplicationMasterLiveNodeData} from the given ZK node data.
+   *
+   * @return the {@link ApplicationMasterLiveNodeData} or {@code null} if failed to decode.
+   */
+  @Nullable
+  static ApplicationMasterLiveNodeData decode(@Nullable NodeData nodeData) {
+    byte[] data = nodeData == null ? null : nodeData.getData();
+    if (data == null) {
+      return null;
+    }
+
+    JsonElement json = GSON.fromJson(new String(data, Charsets.UTF_8), JsonElement.class);
+    if (!json.isJsonObject()) {
+      LOG.warn("Unable to decode live data node.");
+      return null;
+    }
+
+    JsonObject jsonObj = json.getAsJsonObject();
+    json = jsonObj.get("data");
+    if (json == null || !json.isJsonObject()) {
+      LOG.warn("Property data not found in live data node.");
+      return null;
+    }
+
+    try {
+      return GSON.fromJson(json, ApplicationMasterLiveNodeData.class);
+    } catch (Exception e) {
+      LOG.warn("Failed to decode application live node data.", e);
+      return null;
+    }
+  }
+
+  private ApplicationMasterLiveNodeDecoder() {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
new file mode 100644
index 0000000..0738218
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.twill.api.Configs;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.io.LocationCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Responsible for cleanup of {@link LocationCache}.
+ */
+final class LocationCacheCleaner extends AbstractIdleService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LocationCacheCleaner.class);
+
+  private final Location cacheBaseLocation;
+  private final String sessionId;
+  private final long expiry;
+  private final long antiqueExpiry;
+  private final Predicate<Location> cleanupPredicate;
+  private final Set<PendingCleanup> pendingCleanups;
+  private ScheduledExecutorService scheduler;
+
+  LocationCacheCleaner(Configuration config, Location cacheBaseLocation,
+                       String sessionId, Predicate<Location> cleanupPredicate) {
+    this.cacheBaseLocation = cacheBaseLocation;
+    this.sessionId = sessionId;
+    this.expiry = config.getLong(Configs.Keys.LOCATION_CACHE_EXPIRY_MS,
+                                 Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);
+    this.antiqueExpiry = config.getLong(Configs.Keys.LOCATION_CACHE_ANTIQUE_EXPIRY_MS,
+                                        Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
+    this.cleanupPredicate = cleanupPredicate;
+    this.pendingCleanups = new HashSet<>();
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("location-cache-cleanup"));
+    scheduler.execute(new Runnable() {
+      @Override
+      public void run() {
+        long currentTime = System.currentTimeMillis();
+        cleanup(currentTime);
+
+        // By default, run the cleanup at half of the expiry
+        long scheduleDelay = expiry / 2;
+        for (PendingCleanup pendingCleanup : pendingCleanups) {
+          // If there is any pending cleanup that needs to be cleanup early, schedule the run earlier.
+          if (pendingCleanup.getExpireTime() - currentTime < scheduleDelay) {
+            scheduleDelay = pendingCleanup.getExpireTime() - currentTime;
+          }
+        }
+        scheduler.schedule(this, scheduleDelay, TimeUnit.MILLISECONDS);
+      }
+    });
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    scheduler.shutdownNow();
+  }
+
+  @VisibleForTesting
+  void forceCleanup(final long currentTime) {
+    Futures.getUnchecked(scheduler.submit(new Runnable() {
+      @Override
+      public void run() {
+        cleanup(currentTime);
+      }
+    }));
+  }
+
+  /**
+   * Performs cleanup based on the given time.
+   */
+  private void cleanup(long currentTime) {
+    // First go through the pending cleanup list and remove those that can be removed
+    Iterator<PendingCleanup> iterator = pendingCleanups.iterator();
+    while (iterator.hasNext()) {
+      PendingCleanup pendingCleanup = iterator.next();
+
+      // If rejected by the predicate, it means it is being used, hence remove it from the pending cleanup list.
+      if (!cleanupPredicate.apply(pendingCleanup.getLocation())) {
+        iterator.remove();
+      } else {
+        try {
+          // If time is up for the pending entry, the location will be deleted,
+          // hence can be removed from the pending cleanup list.
+          // Otherwise retain it for the next cycle.
+          if (pendingCleanup.deleteIfExpired(currentTime)) {
+            iterator.remove();
+          }
+        } catch (IOException e) {
+          // Log and retain the entry so that another attempt on deletion will be made in next cleanup cycle
+          LOG.warn("Failed to delete {}", pendingCleanup.getLocation(), e);
+        }
+      }
+    }
+
+    // Then collects the next set of locations to be removed
+    try {
+      for (Location cacheDir : cacheBaseLocation.list()) {
+        try {
+          for (Location location : cacheDir.list()) {
+            if (cleanupPredicate.apply(location)) {
+              long expireTime = currentTime;
+              if (cacheDir.getName().equals(sessionId)) {
+                expireTime += expiry;
+              } else {
+                // If the cache entry is from different YarnTwillRunnerService session, use the anti expiry time.
+                expireTime += antiqueExpiry;
+              }
+              // If the location is already pending for cleanup, this won't update the expire time as
+              // the comparison of PendingCleanup is only by location.
+              pendingCleanups.add(new PendingCleanup(location, expireTime));
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to list cache content from {}", cacheDir, e);
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to list cache directories from {}", cacheBaseLocation, e);
+    }
+  }
+
+  /**
+   * Class for holding information about cache location that is pending to be removed.
+   * The equality and hash code is only based on the location.
+   */
+  private static final class PendingCleanup {
+    private final Location location;
+    private final long expireTime;
+
+    PendingCleanup(Location location, long expireTime) {
+      this.location = location;
+      this.expireTime = expireTime;
+    }
+
+    Location getLocation() {
+      return location;
+    }
+
+    long getExpireTime() {
+      return expireTime;
+    }
+
+    /**
+     * Deletes the location in this class if it is expired according to the given current time.
+     *
+     * @return true if expired and attempt was made to delete the location
+     */
+    boolean deleteIfExpired(long currentTime) throws IOException {
+      if (currentTime < expireTime) {
+        return false;
+      }
+      if (location.delete()) {
+        LOG.debug("Cached location removed {}", location);
+      } else {
+        // It's ok to have delete returns false, e.g. if the location is removed by some other process
+        LOG.debug("Failed to delete cached location {}", location);
+      }
+      return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      PendingCleanup that = (PendingCleanup) o;
+      return location.equals(that.location);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(location);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/5986553b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 265c453..61306d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -19,7 +19,6 @@ package org.apache.twill.yarn;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -32,9 +31,12 @@ import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.internal.AbstractTwillController;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
 import org.apache.twill.internal.appmaster.TrackerService;
 import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.zookeeper.NodeData;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.zookeeper.data.Stat;
@@ -44,10 +46,12 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 
 /**
  * A {@link org.apache.twill.api.TwillController} that controllers application running on Hadoop YARN.
@@ -60,6 +64,7 @@ final class YarnTwillController extends AbstractTwillController implements Twill
   private final Callable<ProcessController<YarnApplicationReport>> startUp;
   private final long startTimeout;
   private final TimeUnit startTimeoutUnit;
+  private volatile ApplicationMasterLiveNodeData amLiveNodeData;
   private ProcessController<YarnApplicationReport> processController;
   private ResourceReportClient resourcesClient;
 
@@ -68,12 +73,22 @@ final class YarnTwillController extends AbstractTwillController implements Twill
   private Thread statusPollingThread;
 
   /**
-   * Creates an instance without any {@link LogHandler}.
+   * Creates an instance with an existing {@link ApplicationMasterLiveNodeData}.
    */
   YarnTwillController(String appName, RunId runId, ZKClient zkClient,
-                      Callable<ProcessController<YarnApplicationReport>> startUp) {
-    this(appName, runId, zkClient, ImmutableList.<LogHandler>of(), startUp,
-         Constants.APPLICATION_MAX_START_SECONDS, TimeUnit.SECONDS);
+                      final ApplicationMasterLiveNodeData amLiveNodeData, final YarnAppClient yarnAppClient) {
+    super(runId, zkClient, Collections.<LogHandler>emptyList());
+    this.appName = appName;
+    this.amLiveNodeData = amLiveNodeData;
+    this.startUp = new Callable<ProcessController<YarnApplicationReport>>() {
+      @Override
+      public ProcessController<YarnApplicationReport> call() throws Exception {
+        return yarnAppClient.createProcessController(
+          YarnUtils.createApplicationId(amLiveNodeData.getAppIdClusterTime(), amLiveNodeData.getAppId()));
+      }
+    };
+    this.startTimeout = Constants.APPLICATION_MAX_START_SECONDS;
+    this.startTimeoutUnit = TimeUnit.SECONDS;
   }
 
   YarnTwillController(String appName, RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers,
@@ -94,6 +109,11 @@ final class YarnTwillController extends AbstractTwillController implements Twill
     return sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
   }
 
+  @Nullable
+  ApplicationMasterLiveNodeData getApplicationMasterLiveNodeData() {
+    return amLiveNodeData;
+  }
+
   @Override
   protected void doStartUp() {
     super.doStartUp();
@@ -195,6 +215,10 @@ final class YarnTwillController extends AbstractTwillController implements Twill
 
   @Override
   protected void instanceNodeUpdated(NodeData nodeData) {
+    ApplicationMasterLiveNodeData data = ApplicationMasterLiveNodeDecoder.decode(nodeData);
+    if (data != null) {
+      amLiveNodeData = data;
+    }
   }
 
   @Override