You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:46 UTC

[04/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
deleted file mode 100644
index 17425d4..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CharStreams;
-import com.google.common.io.OutputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.GsonBuilder;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.twill.api.EventHandlerSpecification;
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.SecureStore;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillPreparer;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.filesystem.LocationFactory;
-import org.apache.twill.internal.ApplicationBundler;
-import org.apache.twill.internal.Arguments;
-import org.apache.twill.internal.Configs;
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.DefaultLocalFile;
-import org.apache.twill.internal.DefaultRuntimeSpecification;
-import org.apache.twill.internal.DefaultTwillSpecification;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.LogOnlyEventHandler;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.appmaster.ApplicationMasterMain;
-import org.apache.twill.internal.container.TwillContainerMain;
-import org.apache.twill.internal.json.ArgumentsCodec;
-import org.apache.twill.internal.json.LocalFileCodec;
-import org.apache.twill.internal.json.TwillSpecificationAdapter;
-import org.apache.twill.internal.utils.Dependencies;
-import org.apache.twill.internal.utils.Paths;
-import org.apache.twill.internal.yarn.YarnAppClient;
-import org.apache.twill.internal.yarn.YarnApplicationReport;
-import org.apache.twill.internal.yarn.YarnUtils;
-import org.apache.twill.launcher.TwillLauncher;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClients;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.jar.JarEntry;
-import java.util.jar.JarOutputStream;
-
-/**
- * Implementation for {@link TwillPreparer} to prepare and launch distributed application on Hadoop YARN.
- */
-final class YarnTwillPreparer implements TwillPreparer {
-
-  private static final Logger LOG = LoggerFactory.getLogger(YarnTwillPreparer.class);
-  private static final String KAFKA_ARCHIVE = "kafka-0.7.2.tgz";
-
-  private final YarnConfiguration yarnConfig;
-  private final TwillSpecification twillSpec;
-  private final YarnAppClient yarnAppClient;
-  private final ZKClient zkClient;
-  private final LocationFactory locationFactory;
-  private final Supplier<String> jvmOpts;
-  private final YarnTwillControllerFactory controllerFactory;
-  private final RunId runId;
-
-  private final List<LogHandler> logHandlers = Lists.newArrayList();
-  private final List<String> arguments = Lists.newArrayList();
-  private final Set<Class<?>> dependencies = Sets.newIdentityHashSet();
-  private final List<URI> resources = Lists.newArrayList();
-  private final List<String> classPaths = Lists.newArrayList();
-  private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create();
-  private final Credentials credentials;
-  private final int reservedMemory;
-  private String user;
-
-  YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, YarnAppClient yarnAppClient,
-                    ZKClient zkClient, LocationFactory locationFactory, Supplier<String> jvmOpts,
-                    YarnTwillControllerFactory controllerFactory) {
-    this.yarnConfig = yarnConfig;
-    this.twillSpec = twillSpec;
-    this.yarnAppClient = yarnAppClient;
-    this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
-    this.locationFactory = locationFactory;
-    this.jvmOpts = jvmOpts;
-    this.controllerFactory = controllerFactory;
-    this.runId = RunIds.generate();
-    this.credentials = createCredentials();
-    this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
-                                            Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
-    this.user = System.getProperty("user.name");
-  }
-
-  @Override
-  public TwillPreparer addLogHandler(LogHandler handler) {
-    logHandlers.add(handler);
-    return this;
-  }
-
-  @Override
-  public TwillPreparer setUser(String user) {
-    this.user = user;
-    return this;
-  }
-
-  @Override
-  public TwillPreparer withApplicationArguments(String... args) {
-    return withApplicationArguments(ImmutableList.copyOf(args));
-  }
-
-  @Override
-  public TwillPreparer withApplicationArguments(Iterable<String> args) {
-    Iterables.addAll(arguments, args);
-    return this;
-  }
-
-  @Override
-  public TwillPreparer withArguments(String runnableName, String... args) {
-    return withArguments(runnableName, ImmutableList.copyOf(args));
-  }
-
-  @Override
-  public TwillPreparer withArguments(String runnableName, Iterable<String> args) {
-    runnableArgs.putAll(runnableName, args);
-    return this;
-  }
-
-  @Override
-  public TwillPreparer withDependencies(Class<?>... classes) {
-    return withDependencies(ImmutableList.copyOf(classes));
-  }
-
-  @Override
-  public TwillPreparer withDependencies(Iterable<Class<?>> classes) {
-    Iterables.addAll(dependencies, classes);
-    return this;
-  }
-
-  @Override
-  public TwillPreparer withResources(URI... resources) {
-    return withResources(ImmutableList.copyOf(resources));
-  }
-
-  @Override
-  public TwillPreparer withResources(Iterable<URI> resources) {
-    Iterables.addAll(this.resources, resources);
-    return this;
-  }
-
-  @Override
-  public TwillPreparer withClassPaths(String... classPaths) {
-    return withClassPaths(ImmutableList.copyOf(classPaths));
-  }
-
-  @Override
-  public TwillPreparer withClassPaths(Iterable<String> classPaths) {
-    Iterables.addAll(this.classPaths, classPaths);
-    return this;
-  }
-
-  @Override
-  public TwillPreparer addSecureStore(SecureStore secureStore) {
-    Object store = secureStore.getStore();
-    Preconditions.checkArgument(store instanceof Credentials, "Only Hadoop Credentials is supported.");
-    this.credentials.mergeAll((Credentials) store);
-    return this;
-  }
-
-  @Override
-  public TwillController start() {
-    try {
-      final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(user, twillSpec);
-      final ApplicationId appId = launcher.getContainerInfo();
-
-      Callable<ProcessController<YarnApplicationReport>> submitTask =
-        new Callable<ProcessController<YarnApplicationReport>>() {
-        @Override
-        public ProcessController<YarnApplicationReport> call() throws Exception {
-          String fsUser = locationFactory.getHomeLocation().getName();
-
-          // Local files needed by AM
-          Map<String, LocalFile> localFiles = Maps.newHashMap();
-          // Local files declared by runnables
-          Multimap<String, LocalFile> runnableLocalFiles = HashMultimap.create();
-
-          String vmOpts = jvmOpts.get();
-
-          createAppMasterJar(createBundler(), localFiles);
-          createContainerJar(createBundler(), localFiles);
-          populateRunnableLocalFiles(twillSpec, runnableLocalFiles);
-          saveSpecification(twillSpec, runnableLocalFiles, localFiles);
-          saveLogback(localFiles);
-          saveLauncher(localFiles);
-          saveKafka(localFiles);
-          saveVmOptions(vmOpts, localFiles);
-          saveArguments(new Arguments(arguments, runnableArgs), localFiles);
-          saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
-                                                     Constants.Files.LOGBACK_TEMPLATE,
-                                                     Constants.Files.CONTAINER_JAR,
-                                                     Constants.Files.LAUNCHER_JAR,
-                                                     Constants.Files.ARGUMENTS));
-
-          LOG.debug("Submit AM container spec: {}", appId);
-          // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
-          //     org.apache.twill.internal.TwillLauncher
-          //     appMaster.jar
-          //     org.apache.twill.internal.appmaster.ApplicationMasterMain
-          //     false
-          return launcher.prepareLaunch(
-            ImmutableMap.<String, String>builder()
-              .put(EnvKeys.TWILL_FS_USER, fsUser)
-              .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
-              .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
-              .put(EnvKeys.TWILL_RUN_ID, runId.getId())
-              .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
-              .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()).build(),
-            localFiles.values(), credentials)
-            .noResources()
-            .noEnvironment()
-            .withCommands().add(
-              "java",
-              "-Djava.io.tmpdir=tmp",
-              "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
-              "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
-              "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
-              "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m",
-              vmOpts,
-              TwillLauncher.class.getName(),
-              Constants.Files.APP_MASTER_JAR,
-              ApplicationMasterMain.class.getName(),
-              Boolean.FALSE.toString())
-            .redirectOutput(Constants.STDOUT)
-            .redirectError(Constants.STDERR)
-            .launch();
-        }
-      };
-
-      YarnTwillController controller = controllerFactory.create(runId, logHandlers, submitTask);
-      controller.start();
-      return controller;
-    } catch (Exception e) {
-      LOG.error("Failed to submit application {}", twillSpec.getName(), e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private Credentials createCredentials() {
-    Credentials credentials = new Credentials();
-
-    try {
-      credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
-
-      List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, locationFactory, credentials);
-      for (Token<?> token : tokens) {
-        LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation().toURI(), token);
-      }
-    } catch (IOException e) {
-      LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
-    }
-    return credentials;
-  }
-
-  private ApplicationBundler createBundler() {
-    return new ApplicationBundler(ImmutableList.<String>of());
-  }
-
-  private LocalFile createLocalFile(String name, Location location) throws IOException {
-    return createLocalFile(name, location, false);
-  }
-
-  private LocalFile createLocalFile(String name, Location location, boolean archive) throws IOException {
-    return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
-  }
-
-  private void createAppMasterJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
-    try {
-      LOG.debug("Create and copy {}", Constants.Files.APP_MASTER_JAR);
-      Location location = createTempLocation(Constants.Files.APP_MASTER_JAR);
-
-      List<Class<?>> classes = Lists.newArrayList();
-      classes.add(ApplicationMasterMain.class);
-
-      // Stuck in the yarnAppClient class to make bundler being able to pickup the right yarn-client version
-      classes.add(yarnAppClient.getClass());
-
-      // Add the TwillRunnableEventHandler class
-      if (twillSpec.getEventHandler() != null) {
-        classes.add(getClassLoader().loadClass(twillSpec.getEventHandler().getClassName()));
-      }
-
-      bundler.createBundle(location, classes);
-      LOG.debug("Done {}", Constants.Files.APP_MASTER_JAR);
-
-      localFiles.put(Constants.Files.APP_MASTER_JAR, createLocalFile(Constants.Files.APP_MASTER_JAR, location));
-    } catch (ClassNotFoundException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private void createContainerJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
-    try {
-      Set<Class<?>> classes = Sets.newIdentityHashSet();
-      classes.add(TwillContainerMain.class);
-      classes.addAll(dependencies);
-
-      ClassLoader classLoader = getClassLoader();
-      for (RuntimeSpecification spec : twillSpec.getRunnables().values()) {
-        classes.add(classLoader.loadClass(spec.getRunnableSpecification().getClassName()));
-      }
-
-      LOG.debug("Create and copy {}", Constants.Files.CONTAINER_JAR);
-      Location location = createTempLocation(Constants.Files.CONTAINER_JAR);
-      bundler.createBundle(location, classes, resources);
-      LOG.debug("Done {}", Constants.Files.CONTAINER_JAR);
-
-      localFiles.put(Constants.Files.CONTAINER_JAR, createLocalFile(Constants.Files.CONTAINER_JAR, location));
-
-    } catch (ClassNotFoundException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  /**
-   * Based on the given {@link TwillSpecification}, upload LocalFiles to Yarn Cluster.
-   * @param twillSpec The {@link TwillSpecification} for populating resource.
-   * @param localFiles A Multimap to store runnable name to transformed LocalFiles.
-   * @throws IOException
-   */
-  private void populateRunnableLocalFiles(TwillSpecification twillSpec,
-                                          Multimap<String, LocalFile> localFiles) throws IOException {
-
-    LOG.debug("Populating Runnable LocalFiles");
-    for (Map.Entry<String, RuntimeSpecification> entry: twillSpec.getRunnables().entrySet()) {
-      String runnableName = entry.getKey();
-      for (LocalFile localFile : entry.getValue().getLocalFiles()) {
-        Location location;
-
-        URI uri = localFile.getURI();
-        if ("hdfs".equals(uri.getScheme())) {
-          // Assuming the location factory is HDFS one. If it is not, it will failed, which is the correct behavior.
-          location = locationFactory.create(uri);
-        } else {
-          URL url = uri.toURL();
-          LOG.debug("Create and copy {} : {}", runnableName, url);
-          // Preserves original suffix for expansion.
-          location = copyFromURL(url, createTempLocation(Paths.appendSuffix(url.getFile(), localFile.getName())));
-          LOG.debug("Done {} : {}", runnableName, url);
-        }
-
-        localFiles.put(runnableName,
-                       new DefaultLocalFile(localFile.getName(), location.toURI(), location.lastModified(),
-                                            location.length(), localFile.isArchive(), localFile.getPattern()));
-      }
-    }
-    LOG.debug("Done Runnable LocalFiles");
-  }
-
-  private void saveSpecification(TwillSpecification spec, final Multimap<String, LocalFile> runnableLocalFiles,
-                                 Map<String, LocalFile> localFiles) throws IOException {
-    // Rewrite LocalFiles inside twillSpec
-    Map<String, RuntimeSpecification> runtimeSpec = Maps.transformEntries(
-      spec.getRunnables(), new Maps.EntryTransformer<String, RuntimeSpecification, RuntimeSpecification>() {
-      @Override
-      public RuntimeSpecification transformEntry(String key, RuntimeSpecification value) {
-        return new DefaultRuntimeSpecification(value.getName(), value.getRunnableSpecification(),
-                                               value.getResourceSpecification(), runnableLocalFiles.get(key));
-      }
-    });
-
-    // Serialize into a local temp file.
-    LOG.debug("Create and copy {}", Constants.Files.TWILL_SPEC);
-    Location location = createTempLocation(Constants.Files.TWILL_SPEC);
-    Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
-    try {
-      EventHandlerSpecification eventHandler = spec.getEventHandler();
-      if (eventHandler == null) {
-        eventHandler = new LogOnlyEventHandler().configure();
-      }
-
-      TwillSpecificationAdapter.create().toJson(
-        new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), eventHandler),
-        writer);
-    } finally {
-      writer.close();
-    }
-    LOG.debug("Done {}", Constants.Files.TWILL_SPEC);
-
-    localFiles.put(Constants.Files.TWILL_SPEC, createLocalFile(Constants.Files.TWILL_SPEC, location));
-  }
-
-  private void saveLogback(Map<String, LocalFile> localFiles) throws IOException {
-    LOG.debug("Create and copy {}", Constants.Files.LOGBACK_TEMPLATE);
-    Location location = copyFromURL(getClass().getClassLoader().getResource(Constants.Files.LOGBACK_TEMPLATE),
-                                    createTempLocation(Constants.Files.LOGBACK_TEMPLATE));
-    LOG.debug("Done {}", Constants.Files.LOGBACK_TEMPLATE);
-
-    localFiles.put(Constants.Files.LOGBACK_TEMPLATE, createLocalFile(Constants.Files.LOGBACK_TEMPLATE, location));
-  }
-
-  /**
-   * Creates the launcher.jar for launch the main application.
-   */
-  private void saveLauncher(Map<String, LocalFile> localFiles) throws URISyntaxException, IOException {
-
-    LOG.debug("Create and copy {}", Constants.Files.LAUNCHER_JAR);
-    Location location = createTempLocation(Constants.Files.LAUNCHER_JAR);
-
-    final String launcherName = TwillLauncher.class.getName();
-
-    // Create a jar file with the TwillLauncher optionally a json serialized classpath.json in it.
-    final JarOutputStream jarOut = new JarOutputStream(location.getOutputStream());
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = getClass().getClassLoader();
-    }
-    Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() {
-      @Override
-      public boolean accept(String className, URL classUrl, URL classPathUrl) {
-        Preconditions.checkArgument(className.startsWith(launcherName),
-                                    "Launcher jar should not have dependencies: %s", className);
-        try {
-          jarOut.putNextEntry(new JarEntry(className.replace('.', '/') + ".class"));
-          InputStream is = classUrl.openStream();
-          try {
-            ByteStreams.copy(is, jarOut);
-          } finally {
-            is.close();
-          }
-        } catch (IOException e) {
-          throw Throwables.propagate(e);
-        }
-        return true;
-      }
-    }, TwillLauncher.class.getName());
-
-    try {
-      if (!classPaths.isEmpty()) {
-        jarOut.putNextEntry(new JarEntry("classpath"));
-        jarOut.write(Joiner.on(':').join(classPaths).getBytes(Charsets.UTF_8));
-      }
-    } finally {
-      jarOut.close();
-    }
-    LOG.debug("Done {}", Constants.Files.LAUNCHER_JAR);
-
-    localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location));
-  }
-
-  private void saveKafka(Map<String, LocalFile> localFiles) throws IOException {
-    LOG.debug("Copy {}", Constants.Files.KAFKA);
-    Location location = copyFromURL(getClass().getClassLoader().getResource(KAFKA_ARCHIVE),
-                                    createTempLocation(Constants.Files.KAFKA));
-    LOG.debug("Done {}", Constants.Files.KAFKA);
-
-    localFiles.put(Constants.Files.KAFKA, createLocalFile(Constants.Files.KAFKA, location, true));
-  }
-
-  private void saveVmOptions(String opts, Map<String, LocalFile> localFiles) throws IOException {
-    if (opts.isEmpty()) {
-      // If no vm options, no need to localize the file.
-      return;
-    }
-    LOG.debug("Copy {}", Constants.Files.JVM_OPTIONS);
-    final Location location = createTempLocation(Constants.Files.JVM_OPTIONS);
-    CharStreams.write(opts, new OutputSupplier<Writer>() {
-      @Override
-      public Writer getOutput() throws IOException {
-        return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
-      }
-    });
-    LOG.debug("Done {}", Constants.Files.JVM_OPTIONS);
-
-    localFiles.put(Constants.Files.JVM_OPTIONS, createLocalFile(Constants.Files.JVM_OPTIONS, location));
-  }
-
-  private void saveArguments(Arguments arguments, Map<String, LocalFile> localFiles) throws IOException {
-    LOG.debug("Create and copy {}", Constants.Files.ARGUMENTS);
-    final Location location = createTempLocation(Constants.Files.ARGUMENTS);
-    ArgumentsCodec.encode(arguments, new OutputSupplier<Writer>() {
-      @Override
-      public Writer getOutput() throws IOException {
-        return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
-      }
-    });
-    LOG.debug("Done {}", Constants.Files.ARGUMENTS);
-
-    localFiles.put(Constants.Files.ARGUMENTS, createLocalFile(Constants.Files.ARGUMENTS, location));
-  }
-
-  /**
-   * Serializes the list of files that needs to localize from AM to Container.
-   */
-  private void saveLocalFiles(Map<String, LocalFile> localFiles, Set<String> includes) throws IOException {
-    Map<String, LocalFile> localize = ImmutableMap.copyOf(Maps.filterKeys(localFiles, Predicates.in(includes)));
-    LOG.debug("Create and copy {}", Constants.Files.LOCALIZE_FILES);
-    Location location = createTempLocation(Constants.Files.LOCALIZE_FILES);
-    Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
-    try {
-      new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
-        .create().toJson(localize.values(), new TypeToken<List<LocalFile>>() {
-      }.getType(), writer);
-    } finally {
-      writer.close();
-    }
-    LOG.debug("Done {}", Constants.Files.LOCALIZE_FILES);
-    localFiles.put(Constants.Files.LOCALIZE_FILES, createLocalFile(Constants.Files.LOCALIZE_FILES, location));
-  }
-
-  private Location copyFromURL(URL url, Location target) throws IOException {
-    InputStream is = url.openStream();
-    try {
-      OutputStream os = new BufferedOutputStream(target.getOutputStream());
-      try {
-        ByteStreams.copy(is, os);
-      } finally {
-        os.close();
-      }
-    } finally {
-      is.close();
-    }
-    return target;
-  }
-
-  private Location createTempLocation(String fileName) {
-    String name;
-    String suffix = Paths.getExtension(fileName);
-
-    name = fileName.substring(0, fileName.length() - suffix.length() - 1);
-
-    try {
-      return getAppLocation().append(name).getTempFile('.' + suffix);
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private Location getAppLocation() {
-    return locationFactory.create(String.format("/%s/%s", twillSpec.getName(), runId.getId()));
-  }
-
-  /**
-   * Returns the context ClassLoader if there is any, otherwise, returns ClassLoader of this class.
-   */
-  private ClassLoader getClassLoader() {
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    return classLoader == null ? getClass().getClassLoader() : classLoader;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
deleted file mode 100644
index 9335465..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.SecureStore;
-import org.apache.twill.api.SecureStoreUpdater;
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillPreparer;
-import org.apache.twill.api.TwillRunnable;
-import org.apache.twill.api.TwillRunnerService;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.filesystem.HDFSLocationFactory;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.filesystem.LocationFactory;
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.SingleRunnableApplication;
-import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
-import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
-import org.apache.twill.internal.yarn.YarnAppClient;
-import org.apache.twill.internal.yarn.YarnApplicationReport;
-import org.apache.twill.internal.yarn.YarnUtils;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableTable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Table;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Callables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * An implementation of {@link org.apache.twill.api.TwillRunnerService} that runs application on a YARN cluster.
- */
-public final class YarnTwillRunnerService extends AbstractIdleService implements TwillRunnerService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(YarnTwillRunnerService.class);
-
-  private static final int ZK_TIMEOUT = 10000;
-  private static final Function<String, RunId> STRING_TO_RUN_ID = new Function<String, RunId>() {
-    @Override
-    public RunId apply(String input) {
-      return RunIds.fromString(input);
-    }
-  };
-  private static final Function<YarnTwillController, TwillController> CAST_CONTROLLER =
-    new Function<YarnTwillController, TwillController>() {
-    @Override
-    public TwillController apply(YarnTwillController controller) {
-      return controller;
-    }
-  };
-
-  private final YarnConfiguration yarnConfig;
-  private final YarnAppClient yarnAppClient;
-  private final ZKClientService zkClientService;
-  private final LocationFactory locationFactory;
-  private final Table<String, RunId, YarnTwillController> controllers;
-  private ScheduledExecutorService secureStoreScheduler;
-
-  private Iterable<LiveInfo> liveInfos;
-  private Cancellable watchCancellable;
-  private volatile String jvmOptions = "";
-
-  public YarnTwillRunnerService(YarnConfiguration config, String zkConnect) {
-    this(config, zkConnect, new HDFSLocationFactory(getFileSystem(config), "/twill"));
-  }
-
-  public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
-    this.yarnConfig = config;
-    this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
-    this.locationFactory = locationFactory;
-    this.zkClientService = getZKClientService(zkConnect);
-    this.controllers = HashBasedTable.create();
-  }
-
-  /**
-   * This methods sets the extra JVM options that will be passed to the java command line for every application
-   * started through this {@link YarnTwillRunnerService} instance. It only affects applications that are started
-   * after options is set.
-   *
-   * This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid
-   * options could cause application not able to start.
-   *
-   * @param options extra JVM options.
-   */
-  public void setJVMOptions(String options) {
-    Preconditions.checkArgument(options != null, "JVM options cannot be null.");
-    this.jvmOptions = options;
-  }
-
-  @Override
-  public Cancellable scheduleSecureStoreUpdate(final SecureStoreUpdater updater,
-                                               long initialDelay, long delay, TimeUnit unit) {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return new Cancellable() {
-        @Override
-        public void cancel() {
-          // No-op
-        }
-      };
-    }
-
-    synchronized (this) {
-      if (secureStoreScheduler == null) {
-        secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
-          Threads.createDaemonThreadFactory("secure-store-updater"));
-      }
-    }
-
-    final ScheduledFuture<?> future = secureStoreScheduler.scheduleWithFixedDelay(new Runnable() {
-      @Override
-      public void run() {
-        // Collects all <application, runId> pairs first
-        Multimap<String, RunId> liveApps = HashMultimap.create();
-        synchronized (YarnTwillRunnerService.this) {
-          for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
-            liveApps.put(cell.getRowKey(), cell.getColumnKey());
-          }
-        }
-
-        // Collect all secure stores that needs to be updated.
-        Table<String, RunId, SecureStore> secureStores = HashBasedTable.create();
-        for (Map.Entry<String, RunId> entry : liveApps.entries()) {
-          try {
-            secureStores.put(entry.getKey(), entry.getValue(), updater.update(entry.getKey(), entry.getValue()));
-          } catch (Throwable t) {
-            LOG.warn("Exception thrown by SecureStoreUpdater {}", updater, t);
-          }
-        }
-
-        // Update secure stores.
-        updateSecureStores(secureStores);
-      }
-    }, initialDelay, delay, unit);
-
-    return new Cancellable() {
-      @Override
-      public void cancel() {
-        future.cancel(false);
-      }
-    };
-  }
-
-  @Override
-  public TwillPreparer prepare(TwillRunnable runnable) {
-    return prepare(runnable, ResourceSpecification.BASIC);
-  }
-
-  @Override
-  public TwillPreparer prepare(TwillRunnable runnable, ResourceSpecification resourceSpecification) {
-    return prepare(new SingleRunnableApplication(runnable, resourceSpecification));
-  }
-
-  @Override
-  public TwillPreparer prepare(TwillApplication application) {
-    Preconditions.checkState(isRunning(), "Service not start. Please call start() first.");
-    final TwillSpecification twillSpec = application.configure();
-    final String appName = twillSpec.getName();
-
-    return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory,
-                                 Suppliers.ofInstance(jvmOptions),
-                                 new YarnTwillControllerFactory() {
-      @Override
-      public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
-                                        Callable<ProcessController<YarnApplicationReport>> startUp) {
-        ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
-        YarnTwillController controller = listenController(new YarnTwillController(runId, zkClient,
-                                                                                  logHandlers, startUp));
-        synchronized (YarnTwillRunnerService.this) {
-          Preconditions.checkArgument(!controllers.contains(appName, runId),
-                                      "Application %s with runId %s is already running.", appName, runId);
-          controllers.put(appName, runId, controller);
-        }
-        return controller;
-      }
-    });
-  }
-
-  @Override
-  public synchronized TwillController lookup(String applicationName, final RunId runId) {
-    return controllers.get(applicationName, runId);
-  }
-
-  @Override
-  public Iterable<TwillController> lookup(final String applicationName) {
-    return new Iterable<TwillController>() {
-      @Override
-      public Iterator<TwillController> iterator() {
-        synchronized (YarnTwillRunnerService.this) {
-          return Iterators.transform(ImmutableList.copyOf(controllers.row(applicationName).values()).iterator(),
-                                     CAST_CONTROLLER);
-        }
-      }
-    };
-  }
-
-  @Override
-  public Iterable<LiveInfo> lookupLive() {
-    return liveInfos;
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    yarnAppClient.startAndWait();
-    zkClientService.startAndWait();
-
-    // Create the root node, so that the namespace root would get created if it is missing
-    // If the exception is caused by node exists, then it's ok. Otherwise propagate the exception.
-    ZKOperations.ignoreError(zkClientService.create("/", null, CreateMode.PERSISTENT),
-                             KeeperException.NodeExistsException.class, null).get();
-
-    watchCancellable = watchLiveApps();
-    liveInfos = createLiveInfos();
-
-    // Schedule an updater for updating HDFS delegation tokens
-    if (UserGroupInformation.isSecurityEnabled()) {
-      long delay = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
-                                      DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
-      scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
-                                delay, delay, TimeUnit.MILLISECONDS);
-    }
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    // Shutdown shouldn't stop any controllers, as stopping this client service should let the remote containers
-    // running. However, this assumes that this TwillRunnerService is a long running service and you only stop it
-    // when the JVM process is about to exit. Hence it is important that threads created in the controllers are
-    // daemon threads.
-    synchronized (this) {
-      if (secureStoreScheduler != null) {
-        secureStoreScheduler.shutdownNow();
-      }
-    }
-    watchCancellable.cancel();
-    zkClientService.stopAndWait();
-    yarnAppClient.stopAndWait();
-  }
-
-  private Cancellable watchLiveApps() {
-    final Map<String, Cancellable> watched = Maps.newConcurrentMap();
-
-    final AtomicBoolean cancelled = new AtomicBoolean(false);
-    // Watch child changes in the root, which gives all application names.
-    final Cancellable cancellable = ZKOperations.watchChildren(zkClientService, "/",
-                                                               new ZKOperations.ChildrenCallback() {
-      @Override
-      public void updated(NodeChildren nodeChildren) {
-        if (cancelled.get()) {
-          return;
-        }
-
-        Set<String> apps = ImmutableSet.copyOf(nodeChildren.getChildren());
-
-        // For each for the application name, watch for ephemeral nodes under /instances.
-        for (final String appName : apps) {
-          if (watched.containsKey(appName)) {
-            continue;
-          }
-
-          final String instancePath = String.format("/%s/instances", appName);
-          watched.put(appName,
-                      ZKOperations.watchChildren(zkClientService, instancePath, new ZKOperations.ChildrenCallback() {
-            @Override
-            public void updated(NodeChildren nodeChildren) {
-              if (cancelled.get()) {
-                return;
-              }
-              if (nodeChildren.getChildren().isEmpty()) {     // No more child, means no live instances
-                Cancellable removed = watched.remove(appName);
-                if (removed != null) {
-                  removed.cancel();
-                }
-                return;
-              }
-              synchronized (YarnTwillRunnerService.this) {
-                // For each of the children, which the node name is the runId,
-                // fetch the application Id and construct TwillController.
-                for (final RunId runId : Iterables.transform(nodeChildren.getChildren(), STRING_TO_RUN_ID)) {
-                  if (controllers.contains(appName, runId)) {
-                    continue;
-                  }
-                  updateController(appName, runId, cancelled);
-                }
-              }
-            }
-          }));
-        }
-
-        // Remove app watches for apps that are gone. Removal of controller from controllers table is done
-        // in the state listener attached to the twill controller.
-        for (String removeApp : Sets.difference(watched.keySet(), apps)) {
-          watched.remove(removeApp).cancel();
-        }
-      }
-    });
-    return new Cancellable() {
-      @Override
-      public void cancel() {
-        cancelled.set(true);
-        cancellable.cancel();
-        for (Cancellable c : watched.values()) {
-          c.cancel();
-        }
-      }
-    };
-  }
-
-  private YarnTwillController listenController(final YarnTwillController controller) {
-    controller.addListener(new ServiceListenerAdapter() {
-      @Override
-      public void terminated(State from) {
-        removeController();
-      }
-
-      @Override
-      public void failed(State from, Throwable failure) {
-        removeController();
-      }
-
-      private void removeController() {
-        synchronized (YarnTwillRunnerService.this) {
-          Iterables.removeIf(controllers.values(),
-                             new Predicate<TwillController>() {
-             @Override
-             public boolean apply(TwillController input) {
-               return input == controller;
-             }
-           });
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-    return controller;
-  }
-
-  private ZKClientService getZKClientService(String zkConnect) {
-    return ZKClientServices.delegate(
-      ZKClients.reWatchOnExpire(
-        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnect)
-                                   .setSessionTimeout(ZK_TIMEOUT)
-                                   .build(), RetryStrategies.exponentialDelay(100, 2000, TimeUnit.MILLISECONDS))));
-  }
-
-  private Iterable<LiveInfo> createLiveInfos() {
-    return new Iterable<LiveInfo>() {
-
-      @Override
-      public Iterator<LiveInfo> iterator() {
-        Map<String, Map<RunId, YarnTwillController>> controllerMap = ImmutableTable.copyOf(controllers).rowMap();
-        return Iterators.transform(controllerMap.entrySet().iterator(),
-                                   new Function<Map.Entry<String, Map<RunId, YarnTwillController>>, LiveInfo>() {
-          @Override
-          public LiveInfo apply(final Map.Entry<String, Map<RunId, YarnTwillController>> entry) {
-            return new LiveInfo() {
-              @Override
-              public String getApplicationName() {
-                return entry.getKey();
-              }
-
-              @Override
-              public Iterable<TwillController> getControllers() {
-                return Iterables.transform(entry.getValue().values(), CAST_CONTROLLER);
-              }
-            };
-          }
-        });
-      }
-    };
-  }
-
-  private void updateController(final String appName, final RunId runId, final AtomicBoolean cancelled) {
-    String instancePath = String.format("/%s/instances/%s", appName, runId.getId());
-
-    // Fetch the content node.
-    Futures.addCallback(zkClientService.getData(instancePath), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        if (cancelled.get()) {
-          return;
-        }
-        ApplicationId appId = getApplicationId(result);
-        if (appId == null) {
-          return;
-        }
-
-        synchronized (YarnTwillRunnerService.this) {
-          if (!controllers.contains(appName, runId)) {
-            ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
-            YarnTwillController controller = listenController(
-              new YarnTwillController(runId, zkClient,
-                                      Callables.returning(yarnAppClient.createProcessController(appId))));
-            controllers.put(appName, runId, controller);
-            controller.start();
-          }
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        LOG.warn("Failed in fetching application instance node.", t);
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-
-  /**
-   * Decodes application ID stored inside the node data.
-   * @param nodeData The node data to decode from. If it is {@code null}, this method would return {@code null}.
-   * @return The ApplicationId or {@code null} if failed to decode.
-   */
-  private ApplicationId getApplicationId(NodeData nodeData) {
-    byte[] data = nodeData == null ? null : nodeData.getData();
-    if (data == null) {
-      return null;
-    }
-
-    Gson gson = new Gson();
-    JsonElement json = gson.fromJson(new String(data, Charsets.UTF_8), JsonElement.class);
-    if (!json.isJsonObject()) {
-      LOG.warn("Unable to decode live data node.");
-      return null;
-    }
-
-    JsonObject jsonObj = json.getAsJsonObject();
-    json = jsonObj.get("data");
-    if (!json.isJsonObject()) {
-      LOG.warn("Property data not found in live data node.");
-      return null;
-    }
-
-    try {
-      ApplicationMasterLiveNodeData amLiveNode = gson.fromJson(json, ApplicationMasterLiveNodeData.class);
-      return YarnUtils.createApplicationId(amLiveNode.getAppIdClusterTime(), amLiveNode.getAppId());
-    } catch (Exception e) {
-      LOG.warn("Failed to decode application live node data.", e);
-      return null;
-    }
-  }
-
-  private void updateSecureStores(Table<String, RunId, SecureStore> secureStores) {
-    for (Table.Cell<String, RunId, SecureStore> cell : secureStores.cellSet()) {
-      Object store = cell.getValue().getStore();
-      if (!(store instanceof Credentials)) {
-        LOG.warn("Only Hadoop Credentials is supported. Ignore update for {}.", cell);
-        continue;
-      }
-
-      Credentials credentials = (Credentials) store;
-      if (credentials.getAllTokens().isEmpty()) {
-        // Nothing to update.
-        continue;
-      }
-
-      try {
-        updateCredentials(cell.getRowKey(), cell.getColumnKey(), credentials);
-        synchronized (YarnTwillRunnerService.this) {
-          // Notify the application for secure store updates if it is still running.
-          YarnTwillController controller = controllers.get(cell.getRowKey(), cell.getColumnKey());
-          if (controller != null) {
-            controller.secureStoreUpdated();
-          }
-        }
-      } catch (Throwable t) {
-        LOG.warn("Failed to update secure store for {}.", cell, t);
-      }
-    }
-  }
-
-  private void updateCredentials(String application, RunId runId, Credentials updates) throws IOException {
-    Location credentialsLocation = locationFactory.create(String.format("/%s/%s/%s", application, runId.getId(),
-                                                                        Constants.Files.CREDENTIALS));
-    // Try to read the old credentials.
-    Credentials credentials = new Credentials();
-    if (credentialsLocation.exists()) {
-      DataInputStream is = new DataInputStream(new BufferedInputStream(credentialsLocation.getInputStream()));
-      try {
-        credentials.readTokenStorageStream(is);
-      } finally {
-        is.close();
-      }
-    }
-
-    // Overwrite with the updates.
-    credentials.addAll(updates);
-
-    // Overwrite the credentials.
-    Location tmpLocation = credentialsLocation.getTempFile(Constants.Files.CREDENTIALS);
-
-    // Save the credentials store with user-only permission.
-    DataOutputStream os = new DataOutputStream(new BufferedOutputStream(tmpLocation.getOutputStream("600")));
-    try {
-      credentials.writeTokenStorageToStream(os);
-    } finally {
-      os.close();
-    }
-
-    // Rename the tmp file into the credentials location
-    tmpLocation.renameTo(credentialsLocation);
-
-    LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation.toURI());
-  }
-
-  private static FileSystem getFileSystem(YarnConfiguration configuration) {
-    try {
-      return FileSystem.get(configuration);
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/yarn/package-info.java b/yarn/src/main/java/org/apache/twill/yarn/package-info.java
deleted file mode 100644
index b3cbc5e..0000000
--- a/yarn/src/main/java/org/apache/twill/yarn/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Classes in this package implement the Twill API for Apache Hadoop YARN.
- */
-package org.apache.twill.yarn;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/resources/logback-template.xml
----------------------------------------------------------------------
diff --git a/yarn/src/main/resources/logback-template.xml b/yarn/src/main/resources/logback-template.xml
deleted file mode 100644
index 38cf6c8..0000000
--- a/yarn/src/main/resources/logback-template.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Default logback configuration for twill library -->
-<configuration>
-
-    <logger name="org.apache.hadoop" level="WARN" />
-    <logger name="org.apache.zookeeper" level="WARN" />
-
-    <root level="INFO" />
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java b/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
deleted file mode 100644
index bb1a583..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-/**
- * Server for testing that will die if you give it a 0.
- */
-public final class BuggyServer extends SocketServer {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BuggyServer.class);
-
-  @Override
-  public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
-    String line = reader.readLine();
-    LOG.info("Received: " + line + " going to divide by it");
-    Integer toDivide = Integer.valueOf(line);
-    writer.println(Integer.toString(100 / toDivide));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
deleted file mode 100644
index 1054ec9..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import com.google.common.util.concurrent.Service;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.PrintWriter;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This test is executed by {@link YarnTestSuite}.
- */
-public class DistributeShellTestRun {
-
-  @Ignore
-  @Test
-  public void testDistributedShell() throws InterruptedException {
-    TwillRunner twillRunner = YarnTestSuite.getTwillRunner();
-
-    TwillController controller = twillRunner.prepare(new DistributedShell("pwd", "ls -al"))
-                                            .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
-                                            .start();
-
-    final CountDownLatch stopLatch = new CountDownLatch(1);
-    controller.addListener(new ServiceListenerAdapter() {
-
-      @Override
-      public void terminated(Service.State from) {
-        stopLatch.countDown();
-      }
-
-      @Override
-      public void failed(Service.State from, Throwable failure) {
-        stopLatch.countDown();
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    Assert.assertTrue(stopLatch.await(10, TimeUnit.SECONDS));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java b/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
deleted file mode 100644
index c89371c..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- *
- */
-public final class DistributedShell extends AbstractTwillRunnable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DistributedShell.class);
-
-  public DistributedShell(String...commands) {
-    super(ImmutableMap.of("cmds", Joiner.on(';').join(commands)));
-  }
-
-  @Override
-  public void run() {
-    for (String cmd : Splitter.on(';').split(getArgument("cmds"))) {
-      try {
-        Process process = new ProcessBuilder(ImmutableList.copyOf(Splitter.on(' ').split(cmd)))
-                              .redirectErrorStream(true).start();
-        BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charsets.US_ASCII));
-        try {
-          String line = reader.readLine();
-          while (line != null) {
-            LOG.info(line);
-            line = reader.readLine();
-          }
-        } finally {
-          reader.close();
-        }
-      } catch (IOException e) {
-        LOG.error("Fail to execute command " + cmd, e);
-      }
-    }
-  }
-
-  @Override
-  public void stop() {
-    // No-op
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java b/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
deleted file mode 100644
index 6b77e66..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.Command;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-/**
- * Test server that echoes back what it receives.
- */
-public final class EchoServer extends SocketServer {
-
-  private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
-
-  @Override
-  public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
-    String line = reader.readLine();
-    LOG.info("Received: " + line);
-    if (line != null) {
-      writer.println(line);
-    }
-  }
-
-  @Override
-  public void handleCommand(Command command) throws Exception {
-    LOG.info("Command received: " + command + " " + getContext().getInstanceCount());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
deleted file mode 100644
index d868eef..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillRunnerService;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.discovery.Discoverable;
-import com.google.common.base.Charsets;
-import com.google.common.io.LineReader;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.Socket;
-import java.net.URISyntaxException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Using echo server to test various behavior of YarnTwillService.
- * This test is executed by {@link YarnTestSuite}.
- */
-public class EchoServerTestRun {
-
-  private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);
-
-  @Test
-  public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
-    URISyntaxException, TimeoutException {
-    TwillRunner runner = YarnTestSuite.getTwillRunner();
-
-    TwillController controller = runner.prepare(new EchoServer(),
-                                                ResourceSpecification.Builder.with()
-                                                         .setVirtualCores(1)
-                                                         .setMemory(1, ResourceSpecification.SizeUnit.GIGA)
-                                                         .setInstances(2)
-                                                         .build())
-                                        .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-                                        .withApplicationArguments("echo")
-                                        .withArguments("EchoServer", "echo2")
-                                        .start();
-
-    final CountDownLatch running = new CountDownLatch(1);
-    controller.addListener(new ServiceListenerAdapter() {
-      @Override
-      public void running() {
-        running.countDown();
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
-
-    Iterable<Discoverable> echoServices = controller.discoverService("echo");
-    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
-
-    for (Discoverable discoverable : echoServices) {
-      String msg = "Hello: " + discoverable.getSocketAddress();
-
-      Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
-                                 discoverable.getSocketAddress().getPort());
-      try {
-        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
-        LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
-
-        writer.println(msg);
-        Assert.assertEquals(msg, reader.readLine());
-      } finally {
-        socket.close();
-      }
-    }
-
-    // Increase number of instances
-    controller.changeInstances("EchoServer", 3);
-    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
-
-    echoServices = controller.discoverService("echo2");
-
-    // Decrease number of instances
-    controller.changeInstances("EchoServer", 1);
-    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
-
-    // Increase number of instances again
-    controller.changeInstances("EchoServer", 2);
-    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
-
-    // Make sure still only one app is running
-    Iterable<TwillRunner.LiveInfo> apps = runner.lookupLive();
-    Assert.assertTrue(YarnTestSuite.waitForSize(apps, 1, 60));
-
-    // Creates a new runner service to check it can regain control over running app.
-    TwillRunnerService runnerService = YarnTestSuite.createTwillRunnerService();
-    runnerService.startAndWait();
-
-    try {
-      Iterable <TwillController> controllers = runnerService.lookup("EchoServer");
-      Assert.assertTrue(YarnTestSuite.waitForSize(controllers, 1, 60));
-
-      for (TwillController c : controllers) {
-        LOG.info("Stopping application: " + c.getRunId());
-        c.stop().get(30, TimeUnit.SECONDS);
-      }
-
-      Assert.assertTrue(YarnTestSuite.waitForSize(apps, 0, 60));
-    } finally {
-      runnerService.stopAndWait();
-    }
-
-    // Sleep a bit before exiting.
-    TimeUnit.SECONDS.sleep(2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java b/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
deleted file mode 100644
index 4be2472..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-/**
- * Test server that returns back the value of the env key sent in.  Used to check env for
- * runnables is correctly set.
- */
-public class EnvironmentEchoServer extends SocketServer {
-
-  @Override
-  public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
-    String envKey = reader.readLine();
-    writer.println(System.getenv(envKey));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
deleted file mode 100644
index b3d3933..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.discovery.Discoverable;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Sets;
-import com.google.common.io.LineReader;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class FailureRestartTestRun {
-
-  @Test
-  public void testFailureRestart() throws Exception {
-    TwillRunner runner = YarnTestSuite.getTwillRunner();
-
-    ResourceSpecification resource = ResourceSpecification.Builder.with()
-      .setVirtualCores(1)
-      .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
-      .setInstances(2)
-      .build();
-    TwillController controller = runner.prepare(new FailureRunnable(), resource)
-      .withApplicationArguments("failure")
-      .withArguments(FailureRunnable.class.getSimpleName(), "failure2")
-      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-      .start();
-
-    Iterable<Discoverable> discoverables = controller.discoverService("failure");
-    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
-
-    // Make sure we see the right instance IDs
-    Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
-
-    // Kill server with instanceId = 0
-    controller.sendCommand(FailureRunnable.class.getSimpleName(), Command.Builder.of("kill0").build());
-
-    // Do a shot sleep, make sure the runnable is killed.
-    TimeUnit.SECONDS.sleep(5);
-
-    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
-    // Make sure we see the right instance IDs
-    Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
-
-    controller.stopAndWait();
-  }
-
-  private Set<Integer> getInstances(Iterable<Discoverable> discoverables) throws IOException {
-    Set<Integer> instances = Sets.newHashSet();
-    for (Discoverable discoverable : discoverables) {
-      InetSocketAddress socketAddress = discoverable.getSocketAddress();
-      Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
-      try {
-        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
-        LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
-
-        String msg = "Failure";
-        writer.println(msg);
-
-        String line = reader.readLine();
-        Assert.assertTrue(line.endsWith(msg));
-        instances.add(Integer.parseInt(line.substring(0, line.length() - msg.length())));
-      } finally {
-        socket.close();
-      }
-    }
-    return instances;
-  }
-
-
-  public static final class FailureRunnable extends SocketServer {
-
-    private volatile boolean killed;
-
-    @Override
-    public void run() {
-      killed = false;
-      super.run();
-      if (killed) {
-        throw new RuntimeException("Exception");
-      }
-    }
-
-    @Override
-    public void handleCommand(Command command) throws Exception {
-      if (command.getCommand().equals("kill" + getContext().getInstanceId())) {
-        killed = true;
-        running = false;
-        serverSocket.close();
-      }
-    }
-
-    @Override
-    public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
-      String line = reader.readLine();
-      writer.println(getContext().getInstanceId() + line);
-      writer.flush();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
deleted file mode 100644
index de2c74c..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.discovery.Discoverable;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.io.LineReader;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.concurrent.TimeUnit;
-import java.util.jar.JarEntry;
-import java.util.jar.JarOutputStream;
-
-/**
- * Test for local file transfer.
- */
-public class LocalFileTestRun {
-
-  @ClassRule
-  public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testLocalFile() throws Exception {
-    String header = Files.readFirstLine(new File(getClass().getClassLoader().getResource("header.txt").toURI()),
-                                        Charsets.UTF_8);
-
-    TwillRunner runner = YarnTestSuite.getTwillRunner();
-    if (runner instanceof YarnTwillRunnerService) {
-      ((YarnTwillRunnerService) runner).setJVMOptions("-verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails");
-    }
-
-    TwillController controller = runner.prepare(new LocalFileApplication())
-      .withApplicationArguments("local")
-      .withArguments("LocalFileSocketServer", "local2")
-      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-      .start();
-
-    if (runner instanceof YarnTwillRunnerService) {
-      ((YarnTwillRunnerService) runner).setJVMOptions("");
-    }
-
-    Iterable<Discoverable> discoverables = controller.discoverService("local");
-    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 1, 60));
-
-    InetSocketAddress socketAddress = discoverables.iterator().next().getSocketAddress();
-    Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
-    try {
-      PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
-      LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
-
-      String msg = "Local file test";
-      writer.println(msg);
-      Assert.assertEquals(header, reader.readLine());
-      Assert.assertEquals(msg, reader.readLine());
-    } finally {
-      socket.close();
-    }
-
-    controller.stopAndWait();
-
-    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 0, 60));
-
-    TimeUnit.SECONDS.sleep(2);
-  }
-
-  public static final class LocalFileApplication implements TwillApplication {
-
-    private final File headerFile;
-
-    public LocalFileApplication() throws Exception {
-      // Create a jar file that contains the header.txt file inside.
-      headerFile = tmpFolder.newFile("header.jar");
-      JarOutputStream os = new JarOutputStream(new FileOutputStream(headerFile));
-      try {
-        os.putNextEntry(new JarEntry("header.txt"));
-        ByteStreams.copy(getClass().getClassLoader().getResourceAsStream("header.txt"), os);
-      } finally {
-        os.close();
-      }
-    }
-
-    @Override
-    public TwillSpecification configure() {
-      return TwillSpecification.Builder.with()
-        .setName("LocalFileApp")
-        .withRunnable()
-          .add(new LocalFileSocketServer())
-            .withLocalFiles()
-              .add("header", headerFile, true).apply()
-        .anyOrder()
-        .build();
-    }
-  }
-
-  public static final class LocalFileSocketServer extends SocketServer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSocketServer.class);
-
-    @Override
-    public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
-      // Verify there is a gc.log file locally
-      Preconditions.checkState(new File("gc.log").exists());
-
-      LOG.info("handleRequest");
-      String header = Files.toString(new File("header/header.txt"), Charsets.UTF_8);
-      writer.write(header);
-      writer.println(reader.readLine());
-      LOG.info("Flushed response");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
deleted file mode 100644
index 0598ef1..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.EventHandler;
-import org.apache.twill.api.EventHandlerContext;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.common.Services;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-
-import java.io.PrintWriter;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- *
- */
-public class ProvisionTimeoutTestRun {
-
-  @Test
-  public void testProvisionTimeout() throws InterruptedException, ExecutionException, TimeoutException {
-    TwillRunner runner = YarnTestSuite.getTwillRunner();
-
-    TwillController controller = runner.prepare(new TimeoutApplication())
-                                       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-                                       .start();
-
-    // The provision should failed in 30 seconds after AM started, which AM could took a while to start.
-    // Hence we give 90 seconds max time here.
-    try {
-      Services.getCompletionFuture(controller).get(90, TimeUnit.SECONDS);
-    } finally {
-      // If it timeout, kill the app as cleanup.
-      controller.kill();
-    }
-  }
-
-  public static final class Handler extends EventHandler {
-
-    private boolean abort;
-
-    @Override
-    protected Map<String, String> getConfigs() {
-      return ImmutableMap.of("abort", "true");
-    }
-
-    @Override
-    public void initialize(EventHandlerContext context) {
-      this.abort = Boolean.parseBoolean(context.getSpecification().getConfigs().get("abort"));
-    }
-
-    @Override
-    public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
-      if (abort) {
-        return TimeoutAction.abort();
-      } else {
-        return TimeoutAction.recheck(10, TimeUnit.SECONDS);
-      }
-    }
-  }
-
-  public static final class TimeoutApplication implements TwillApplication {
-
-    @Override
-    public TwillSpecification configure() {
-      return TwillSpecification.Builder.with()
-        .setName("TimeoutApplication")
-        .withRunnable()
-        .add(new TimeoutRunnable(),
-             ResourceSpecification.Builder.with()
-               .setVirtualCores(1)
-               .setMemory(8, ResourceSpecification.SizeUnit.GIGA).build())
-        .noLocalFiles()
-        .anyOrder()
-        .withEventHandler(new Handler())
-        .build();
-    }
-  }
-
-  /**
-   * A runnable that do nothing, as it's not expected to get provisioned.
-   */
-  public static final class TimeoutRunnable extends AbstractTwillRunnable {
-
-    private final CountDownLatch latch = new CountDownLatch(1);
-
-    @Override
-    public void stop() {
-      latch.countDown();
-    }
-
-    @Override
-    public void run() {
-      // Simply block here
-      try {
-        latch.await();
-      } catch (InterruptedException e) {
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-}