You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:27 UTC

[11/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
new file mode 100644
index 0000000..4f7597b
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.filesystem.ForwardingLocationFactory;
+import org.apache.twill.filesystem.HDFSLocationFactory;
+import org.apache.twill.filesystem.LocationFactory;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Collection of helper methods to simplify YARN calls.
+ */
+public class YarnUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
+  private static final AtomicReference<Boolean> HADOOP_20 = new AtomicReference<Boolean>();
+
+  public static YarnLocalResource createLocalResource(LocalFile localFile) {
+    Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last modified time should be >= 0.");
+    Preconditions.checkArgument(localFile.getSize() >= 0, "File size should be >= 0.");
+
+    YarnLocalResource resource = createAdapter(YarnLocalResource.class);
+    resource.setVisibility(LocalResourceVisibility.APPLICATION);
+    resource.setResource(ConverterUtils.getYarnUrlFromURI(localFile.getURI()));
+    resource.setTimestamp(localFile.getLastModified());
+    resource.setSize(localFile.getSize());
+    return setLocalResourceType(resource, localFile);
+  }
+
+  public static YarnLaunchContext createLaunchContext() {
+    return createAdapter(YarnLaunchContext.class);
+  }
+
+  // temporary workaround since older versions of hadoop don't have the getVirtualCores method.
+  public static int getVirtualCores(Resource resource) {
+    try {
+      Method getVirtualCores = Resource.class.getMethod("getVirtualCores");
+      return (Integer) getVirtualCores.invoke(resource);
+    } catch (Exception e) {
+      return 0;
+    }
+  }
+
+  /**
+   * Temporary workaround since older versions of hadoop don't have the setCores method.
+   *
+   * @param resource
+   * @param cores
+   * @return true if virtual cores was set, false if not.
+   */
+  public static boolean setVirtualCores(Resource resource, int cores) {
+    try {
+      Method setVirtualCores = Resource.class.getMethod("setVirtualCores", int.class);
+      setVirtualCores.invoke(resource, cores);
+    } catch (Exception e) {
+      // It's ok to ignore this exception, as it's using older version of API.
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Creates {@link ApplicationId} from the given cluster timestamp and id.
+   */
+  public static ApplicationId createApplicationId(long timestamp, int id) {
+    try {
+      try {
+        // For Hadoop-2.1
+        Method method = ApplicationId.class.getMethod("newInstance", long.class, int.class);
+        return (ApplicationId) method.invoke(null, timestamp, id);
+      } catch (NoSuchMethodException e) {
+        // Try with Hadoop-2.0 way
+        ApplicationId appId = Records.newRecord(ApplicationId.class);
+
+        Method setClusterTimestamp = ApplicationId.class.getMethod("setClusterTimestamp", long.class);
+        Method setId = ApplicationId.class.getMethod("setId", int.class);
+
+        setClusterTimestamp.invoke(appId, timestamp);
+        setId.invoke(appId, id);
+
+        return appId;
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Helper method to get delegation tokens for the given LocationFactory.
+   * @param config The hadoop configuration.
+   * @param locationFactory The LocationFactory for generating tokens.
+   * @param credentials Credentials for storing tokens acquired.
+   * @return List of delegation Tokens acquired.
+   */
+  public static List<Token<?>> addDelegationTokens(Configuration config,
+                                                   LocationFactory locationFactory,
+                                                   Credentials credentials) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      LOG.debug("Security is not enabled");
+      return ImmutableList.of();
+    }
+
+    FileSystem fileSystem = getFileSystem(locationFactory);
+
+    if (fileSystem == null) {
+      LOG.debug("LocationFactory is not HDFS");
+      return ImmutableList.of();
+    }
+
+    String renewer = getYarnTokenRenewer(config);
+
+    Token<?>[] tokens = fileSystem.addDelegationTokens(renewer, credentials);
+    return tokens == null ? ImmutableList.<Token<?>>of() : ImmutableList.copyOf(tokens);
+  }
+
+  public static ByteBuffer encodeCredentials(Credentials credentials) {
+    try {
+      DataOutputBuffer out = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(out);
+      return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+    } catch (IOException e) {
+      // Shouldn't throw
+      LOG.error("Failed to encode Credentials.", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Decodes {@link Credentials} from the given buffer.
+   * If the buffer is null or empty, it returns an empty Credentials.
+   */
+  public static Credentials decodeCredentials(ByteBuffer buffer) throws IOException {
+    Credentials credentials = new Credentials();
+    if (buffer != null && buffer.hasRemaining()) {
+      DataInputByteBuffer in = new DataInputByteBuffer();
+      in.reset(buffer);
+      credentials.readTokenStorageStream(in);
+    }
+    return credentials;
+  }
+
+  public static String getYarnTokenRenewer(Configuration config) throws IOException {
+    String rmHost = getRMAddress(config).getHostName();
+    String renewer = SecurityUtil.getServerPrincipal(config.get(YarnConfiguration.RM_PRINCIPAL), rmHost);
+
+    if (renewer == null || renewer.length() == 0) {
+      throw new IOException("No Kerberos principal for Yarn RM to use as renewer");
+    }
+
+    return renewer;
+  }
+
+  public static InetSocketAddress getRMAddress(Configuration config) {
+    return config.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+                                YarnConfiguration.DEFAULT_RM_ADDRESS,
+                                YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  /**
+   * Returns true if Hadoop-2.0 classes are in the classpath.
+   */
+  public static boolean isHadoop20() {
+    Boolean hadoop20 = HADOOP_20.get();
+    if (hadoop20 != null) {
+      return hadoop20;
+    }
+    try {
+      Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
+      HADOOP_20.set(false);
+      return false;
+    } catch (ClassNotFoundException e) {
+      HADOOP_20.set(true);
+      return true;
+    }
+  }
+
+  /**
+   * Helper method to create adapter class for bridging between Hadoop 2.0 and 2.1
+   */
+  private static <T> T createAdapter(Class<T> clz) {
+    String className = clz.getPackage().getName();
+
+    if (isHadoop20()) {
+      className += ".Hadoop20" + clz.getSimpleName();
+    } else {
+      className += ".Hadoop21" + clz.getSimpleName();
+    }
+
+    try {
+      return (T) Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private static YarnLocalResource setLocalResourceType(YarnLocalResource localResource, LocalFile localFile) {
+    if (localFile.isArchive()) {
+      if (localFile.getPattern() == null) {
+        localResource.setType(LocalResourceType.ARCHIVE);
+      } else {
+        localResource.setType(LocalResourceType.PATTERN);
+        localResource.setPattern(localFile.getPattern());
+      }
+    } else {
+      localResource.setType(LocalResourceType.FILE);
+    }
+    return localResource;
+  }
+
+  private static <T> Map<String, T> transformResource(Map<String, YarnLocalResource> from) {
+    return Maps.transformValues(from, new Function<YarnLocalResource, T>() {
+      @Override
+      public T apply(YarnLocalResource resource) {
+        return resource.getLocalResource();
+      }
+    });
+  }
+
+  /**
+   * Gets the Hadoop FileSystem from LocationFactory.
+   */
+  private static FileSystem getFileSystem(LocationFactory locationFactory) {
+    if (locationFactory instanceof HDFSLocationFactory) {
+      return ((HDFSLocationFactory) locationFactory).getFileSystem();
+    }
+    if (locationFactory instanceof ForwardingLocationFactory) {
+      return getFileSystem(((ForwardingLocationFactory) locationFactory).getDelegate());
+    }
+    return null;
+  }
+
+  private YarnUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
new file mode 100644
index 0000000..d6ec9f7
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains class for interacting with Yarn.
+ */
+package org.apache.twill.internal.yarn;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
new file mode 100644
index 0000000..4d20c9c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationSecureStoreUpdater.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.api.SecureStoreUpdater;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.yarn.YarnUtils;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+
+/**
+ * Package private class for updating location related secure store.
+ */
+final class LocationSecureStoreUpdater implements SecureStoreUpdater {
+
+  private final Configuration configuration;
+  private final LocationFactory locationFactory;
+
+  LocationSecureStoreUpdater(Configuration configuration, LocationFactory locationFactory) {
+    this.configuration = configuration;
+    this.locationFactory = locationFactory;
+  }
+
+  @Override
+  public SecureStore update(String application, RunId runId) {
+    try {
+      Credentials credentials = new Credentials();
+      YarnUtils.addDelegationTokens(configuration, locationFactory, credentials);
+      return YarnSecureStore.create(credentials);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
new file mode 100644
index 0000000..2974c3f
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.internal.json.ResourceReportAdapter;
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+
+/**
+ * Package private class to get {@link ResourceReport} from the application master.
+ */
+final class ResourceReportClient {
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceReportClient.class);
+
+  private final ResourceReportAdapter reportAdapter;
+  private final URL resourceUrl;
+
+  ResourceReportClient(URL resourceUrl) {
+    this.resourceUrl = resourceUrl;
+    this.reportAdapter = ResourceReportAdapter.create();
+  }
+
+  /**
+   * Returns the resource usage of the application fetched from the resource endpoint URL.
+   * @return A {@link ResourceReport} or {@code null} if failed to fetch the report.
+   */
+  public ResourceReport get() {
+    try {
+      Reader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), Charsets.UTF_8));
+      try {
+        return reportAdapter.fromJson(reader);
+      } finally {
+        Closeables.closeQuietly(reader);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception getting resource report from {}.", resourceUrl, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
new file mode 100644
index 0000000..e6f461a
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnSecureStore.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.SecureStore;
+import org.apache.hadoop.security.Credentials;
+
+/**
+ * A {@link SecureStore} for hadoop credentials.
+ */
+public final class YarnSecureStore implements SecureStore {
+
+  private final Credentials credentials;
+
+  public static SecureStore create(Credentials credentials) {
+    return new YarnSecureStore(credentials);
+  }
+
+  private YarnSecureStore(Credentials credentials) {
+    this.credentials = credentials;
+  }
+
+  @Override
+  public Credentials getStore() {
+    return credentials;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
new file mode 100644
index 0000000..4c240fb
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.internal.AbstractTwillController;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.appmaster.TrackerService;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link org.apache.twill.api.TwillController} that controllers application running on Hadoop YARN.
+ */
+final class YarnTwillController extends AbstractTwillController implements TwillController {
+
+  private static final Logger LOG = LoggerFactory.getLogger(YarnTwillController.class);
+
+  private final Callable<ProcessController<YarnApplicationReport>> startUp;
+  private ProcessController<YarnApplicationReport> processController;
+  private ResourceReportClient resourcesClient;
+
+  /**
+   * Creates an instance without any {@link LogHandler}.
+   */
+  YarnTwillController(RunId runId, ZKClient zkClient, Callable<ProcessController<YarnApplicationReport>> startUp) {
+    this(runId, zkClient, ImmutableList.<LogHandler>of(), startUp);
+  }
+
+  YarnTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers,
+                      Callable<ProcessController<YarnApplicationReport>> startUp) {
+    super(runId, zkClient, logHandlers);
+    this.startUp = startUp;
+  }
+
+
+  /**
+   * Sends a message to application to notify the secure store has be updated.
+   */
+  ListenableFuture<Void> secureStoreUpdated() {
+    return sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
+  }
+
+  @Override
+  protected void doStartUp() {
+    super.doStartUp();
+
+    // Submit and poll the status of the yarn application
+    try {
+      processController = startUp.call();
+
+      YarnApplicationReport report = processController.getReport();
+      LOG.debug("Application {} submit", report.getApplicationId());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      StopWatch stopWatch = new StopWatch();
+      stopWatch.start();
+      stopWatch.split();
+      long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_START_SECONDS, TimeUnit.SECONDS);
+
+      LOG.info("Checking yarn application status");
+      while (!hasRun(state) && stopWatch.getSplitTime() < maxTime) {
+        report = processController.getReport();
+        state = report.getYarnApplicationState();
+        LOG.debug("Yarn application status: {}", state);
+        TimeUnit.SECONDS.sleep(1);
+        stopWatch.split();
+      }
+      LOG.info("Yarn application is in state {}", state);
+      if (state != YarnApplicationState.RUNNING) {
+        LOG.info("Yarn application is not in running state. Shutting down controller.",
+                 Constants.APPLICATION_MAX_START_SECONDS);
+        forceShutDown();
+      } else {
+        try {
+          URL resourceUrl = URI.create(String.format("http://%s:%d", report.getHost(), report.getRpcPort()))
+                               .resolve(TrackerService.PATH).toURL();
+          resourcesClient = new ResourceReportClient(resourceUrl);
+        } catch (IOException e) {
+          resourcesClient = null;
+        }
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  protected void doShutDown() {
+    if (processController == null) {
+      LOG.warn("No process controller for application that is not submitted.");
+      return;
+    }
+
+    // Wait for the stop message being processed
+    try {
+      Uninterruptibles.getUninterruptibly(getStopMessageFuture(),
+                                          Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Failed to wait for stop message being processed.", e);
+      // Kill the application through yarn
+      kill();
+    }
+
+    // Poll application status from yarn
+    try {
+      StopWatch stopWatch = new StopWatch();
+      stopWatch.start();
+      stopWatch.split();
+      long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
+
+      YarnApplicationReport report = processController.getReport();
+      FinalApplicationStatus finalStatus = report.getFinalApplicationStatus();
+      while (finalStatus == FinalApplicationStatus.UNDEFINED && stopWatch.getSplitTime() < maxTime) {
+        LOG.debug("Yarn application final status for {} {}", report.getApplicationId(), finalStatus);
+        TimeUnit.SECONDS.sleep(1);
+        stopWatch.split();
+        finalStatus = processController.getReport().getFinalApplicationStatus();
+      }
+      LOG.debug("Yarn application final status is {}", finalStatus);
+
+      // Application not finished after max stop time, kill the application
+      if (finalStatus == FinalApplicationStatus.UNDEFINED) {
+        kill();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception while waiting for application report: {}", e.getMessage(), e);
+      kill();
+    }
+
+    super.doShutDown();
+  }
+
+  @Override
+  public void kill() {
+    if (processController != null) {
+      YarnApplicationReport report = processController.getReport();
+      LOG.info("Killing application {}", report.getApplicationId());
+      processController.cancel();
+    } else {
+      LOG.warn("No process controller for application that is not submitted.");
+    }
+  }
+
+  @Override
+  protected void instanceNodeUpdated(NodeData nodeData) {
+
+  }
+
+  @Override
+  protected void stateNodeUpdated(StateNode stateNode) {
+
+  }
+
+  private boolean hasRun(YarnApplicationState state) {
+    switch (state) {
+      case RUNNING:
+      case FINISHED:
+      case FAILED:
+      case KILLED:
+        return true;
+    }
+    return false;
+  }
+
+  @Override
+  public ResourceReport getResourceReport() {
+    // in case the user calls this before starting, return null
+    return (resourcesClient == null) ? null : resourcesClient.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
new file mode 100644
index 0000000..11c2ae6
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillControllerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Factory for creating {@link YarnTwillController}.
+ */
+interface YarnTwillControllerFactory {
+
+  YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+                             Callable<ProcessController<YarnApplicationReport>> startUp);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
new file mode 100644
index 0000000..17425d4
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CharStreams;
+import com.google.common.io.OutputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.GsonBuilder;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillPreparer;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.ApplicationBundler;
+import org.apache.twill.internal.Arguments;
+import org.apache.twill.internal.Configs;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.DefaultLocalFile;
+import org.apache.twill.internal.DefaultRuntimeSpecification;
+import org.apache.twill.internal.DefaultTwillSpecification;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.LogOnlyEventHandler;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.appmaster.ApplicationMasterMain;
+import org.apache.twill.internal.container.TwillContainerMain;
+import org.apache.twill.internal.json.ArgumentsCodec;
+import org.apache.twill.internal.json.LocalFileCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.internal.utils.Dependencies;
+import org.apache.twill.internal.utils.Paths;
+import org.apache.twill.internal.yarn.YarnAppClient;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.apache.twill.launcher.TwillLauncher;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+/**
+ * Implementation for {@link TwillPreparer} to prepare and launch distributed application on Hadoop YARN.
+ */
+final class YarnTwillPreparer implements TwillPreparer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(YarnTwillPreparer.class);
+  private static final String KAFKA_ARCHIVE = "kafka-0.7.2.tgz";
+
+  private final YarnConfiguration yarnConfig;
+  private final TwillSpecification twillSpec;
+  private final YarnAppClient yarnAppClient;
+  private final ZKClient zkClient;
+  private final LocationFactory locationFactory;
+  private final Supplier<String> jvmOpts;
+  private final YarnTwillControllerFactory controllerFactory;
+  private final RunId runId;
+
+  private final List<LogHandler> logHandlers = Lists.newArrayList();
+  private final List<String> arguments = Lists.newArrayList();
+  private final Set<Class<?>> dependencies = Sets.newIdentityHashSet();
+  private final List<URI> resources = Lists.newArrayList();
+  private final List<String> classPaths = Lists.newArrayList();
+  private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create();
+  private final Credentials credentials;
+  private final int reservedMemory;
+  private String user;
+
+  YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, YarnAppClient yarnAppClient,
+                    ZKClient zkClient, LocationFactory locationFactory, Supplier<String> jvmOpts,
+                    YarnTwillControllerFactory controllerFactory) {
+    this.yarnConfig = yarnConfig;
+    this.twillSpec = twillSpec;
+    this.yarnAppClient = yarnAppClient;
+    this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
+    this.locationFactory = locationFactory;
+    this.jvmOpts = jvmOpts;
+    this.controllerFactory = controllerFactory;
+    this.runId = RunIds.generate();
+    this.credentials = createCredentials();
+    this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
+                                            Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
+    this.user = System.getProperty("user.name");
+  }
+
+  @Override
+  public TwillPreparer addLogHandler(LogHandler handler) {
+    logHandlers.add(handler);
+    return this;
+  }
+
+  @Override
+  public TwillPreparer setUser(String user) {
+    this.user = user;
+    return this;
+  }
+
+  @Override
+  public TwillPreparer withApplicationArguments(String... args) {
+    return withApplicationArguments(ImmutableList.copyOf(args));
+  }
+
+  @Override
+  public TwillPreparer withApplicationArguments(Iterable<String> args) {
+    Iterables.addAll(arguments, args);
+    return this;
+  }
+
+  @Override
+  public TwillPreparer withArguments(String runnableName, String... args) {
+    return withArguments(runnableName, ImmutableList.copyOf(args));
+  }
+
+  @Override
+  public TwillPreparer withArguments(String runnableName, Iterable<String> args) {
+    runnableArgs.putAll(runnableName, args);
+    return this;
+  }
+
+  @Override
+  public TwillPreparer withDependencies(Class<?>... classes) {
+    return withDependencies(ImmutableList.copyOf(classes));
+  }
+
+  @Override
+  public TwillPreparer withDependencies(Iterable<Class<?>> classes) {
+    Iterables.addAll(dependencies, classes);
+    return this;
+  }
+
+  @Override
+  public TwillPreparer withResources(URI... resources) {
+    return withResources(ImmutableList.copyOf(resources));
+  }
+
+  @Override
+  public TwillPreparer withResources(Iterable<URI> resources) {
+    Iterables.addAll(this.resources, resources);
+    return this;
+  }
+
+  @Override
+  public TwillPreparer withClassPaths(String... classPaths) {
+    return withClassPaths(ImmutableList.copyOf(classPaths));
+  }
+
+  @Override
+  public TwillPreparer withClassPaths(Iterable<String> classPaths) {
+    Iterables.addAll(this.classPaths, classPaths);
+    return this;
+  }
+
+  @Override
+  public TwillPreparer addSecureStore(SecureStore secureStore) {
+    Object store = secureStore.getStore();
+    Preconditions.checkArgument(store instanceof Credentials, "Only Hadoop Credentials is supported.");
+    this.credentials.mergeAll((Credentials) store);
+    return this;
+  }
+
+  @Override
+  public TwillController start() {
+    try {
+      final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(user, twillSpec);
+      final ApplicationId appId = launcher.getContainerInfo();
+
+      Callable<ProcessController<YarnApplicationReport>> submitTask =
+        new Callable<ProcessController<YarnApplicationReport>>() {
+        @Override
+        public ProcessController<YarnApplicationReport> call() throws Exception {
+          String fsUser = locationFactory.getHomeLocation().getName();
+
+          // Local files needed by AM
+          Map<String, LocalFile> localFiles = Maps.newHashMap();
+          // Local files declared by runnables
+          Multimap<String, LocalFile> runnableLocalFiles = HashMultimap.create();
+
+          String vmOpts = jvmOpts.get();
+
+          createAppMasterJar(createBundler(), localFiles);
+          createContainerJar(createBundler(), localFiles);
+          populateRunnableLocalFiles(twillSpec, runnableLocalFiles);
+          saveSpecification(twillSpec, runnableLocalFiles, localFiles);
+          saveLogback(localFiles);
+          saveLauncher(localFiles);
+          saveKafka(localFiles);
+          saveVmOptions(vmOpts, localFiles);
+          saveArguments(new Arguments(arguments, runnableArgs), localFiles);
+          saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
+                                                     Constants.Files.LOGBACK_TEMPLATE,
+                                                     Constants.Files.CONTAINER_JAR,
+                                                     Constants.Files.LAUNCHER_JAR,
+                                                     Constants.Files.ARGUMENTS));
+
+          LOG.debug("Submit AM container spec: {}", appId);
+          // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
+          //     org.apache.twill.internal.TwillLauncher
+          //     appMaster.jar
+          //     org.apache.twill.internal.appmaster.ApplicationMasterMain
+          //     false
+          return launcher.prepareLaunch(
+            ImmutableMap.<String, String>builder()
+              .put(EnvKeys.TWILL_FS_USER, fsUser)
+              .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
+              .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+              .put(EnvKeys.TWILL_RUN_ID, runId.getId())
+              .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
+              .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()).build(),
+            localFiles.values(), credentials)
+            .noResources()
+            .noEnvironment()
+            .withCommands().add(
+              "java",
+              "-Djava.io.tmpdir=tmp",
+              "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
+              "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
+              "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
+              "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m",
+              vmOpts,
+              TwillLauncher.class.getName(),
+              Constants.Files.APP_MASTER_JAR,
+              ApplicationMasterMain.class.getName(),
+              Boolean.FALSE.toString())
+            .redirectOutput(Constants.STDOUT)
+            .redirectError(Constants.STDERR)
+            .launch();
+        }
+      };
+
+      YarnTwillController controller = controllerFactory.create(runId, logHandlers, submitTask);
+      controller.start();
+      return controller;
+    } catch (Exception e) {
+      LOG.error("Failed to submit application {}", twillSpec.getName(), e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private Credentials createCredentials() {
+    Credentials credentials = new Credentials();
+
+    try {
+      credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
+
+      List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, locationFactory, credentials);
+      for (Token<?> token : tokens) {
+        LOG.debug("Delegation token acquired for {}, {}", locationFactory.getHomeLocation().toURI(), token);
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
+    }
+    return credentials;
+  }
+
+  private ApplicationBundler createBundler() {
+    return new ApplicationBundler(ImmutableList.<String>of());
+  }
+
+  private LocalFile createLocalFile(String name, Location location) throws IOException {
+    return createLocalFile(name, location, false);
+  }
+
+  private LocalFile createLocalFile(String name, Location location, boolean archive) throws IOException {
+    return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
+  }
+
+  private void createAppMasterJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+    try {
+      LOG.debug("Create and copy {}", Constants.Files.APP_MASTER_JAR);
+      Location location = createTempLocation(Constants.Files.APP_MASTER_JAR);
+
+      List<Class<?>> classes = Lists.newArrayList();
+      classes.add(ApplicationMasterMain.class);
+
+      // Stuck in the yarnAppClient class to make bundler being able to pickup the right yarn-client version
+      classes.add(yarnAppClient.getClass());
+
+      // Add the TwillRunnableEventHandler class
+      if (twillSpec.getEventHandler() != null) {
+        classes.add(getClassLoader().loadClass(twillSpec.getEventHandler().getClassName()));
+      }
+
+      bundler.createBundle(location, classes);
+      LOG.debug("Done {}", Constants.Files.APP_MASTER_JAR);
+
+      localFiles.put(Constants.Files.APP_MASTER_JAR, createLocalFile(Constants.Files.APP_MASTER_JAR, location));
+    } catch (ClassNotFoundException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private void createContainerJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+    try {
+      Set<Class<?>> classes = Sets.newIdentityHashSet();
+      classes.add(TwillContainerMain.class);
+      classes.addAll(dependencies);
+
+      ClassLoader classLoader = getClassLoader();
+      for (RuntimeSpecification spec : twillSpec.getRunnables().values()) {
+        classes.add(classLoader.loadClass(spec.getRunnableSpecification().getClassName()));
+      }
+
+      LOG.debug("Create and copy {}", Constants.Files.CONTAINER_JAR);
+      Location location = createTempLocation(Constants.Files.CONTAINER_JAR);
+      bundler.createBundle(location, classes, resources);
+      LOG.debug("Done {}", Constants.Files.CONTAINER_JAR);
+
+      localFiles.put(Constants.Files.CONTAINER_JAR, createLocalFile(Constants.Files.CONTAINER_JAR, location));
+
+    } catch (ClassNotFoundException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Based on the given {@link TwillSpecification}, upload LocalFiles to Yarn Cluster.
+   * @param twillSpec The {@link TwillSpecification} for populating resource.
+   * @param localFiles A Multimap to store runnable name to transformed LocalFiles.
+   * @throws IOException
+   */
+  private void populateRunnableLocalFiles(TwillSpecification twillSpec,
+                                          Multimap<String, LocalFile> localFiles) throws IOException {
+
+    LOG.debug("Populating Runnable LocalFiles");
+    for (Map.Entry<String, RuntimeSpecification> entry: twillSpec.getRunnables().entrySet()) {
+      String runnableName = entry.getKey();
+      for (LocalFile localFile : entry.getValue().getLocalFiles()) {
+        Location location;
+
+        URI uri = localFile.getURI();
+        if ("hdfs".equals(uri.getScheme())) {
+          // Assuming the location factory is HDFS one. If it is not, it will failed, which is the correct behavior.
+          location = locationFactory.create(uri);
+        } else {
+          URL url = uri.toURL();
+          LOG.debug("Create and copy {} : {}", runnableName, url);
+          // Preserves original suffix for expansion.
+          location = copyFromURL(url, createTempLocation(Paths.appendSuffix(url.getFile(), localFile.getName())));
+          LOG.debug("Done {} : {}", runnableName, url);
+        }
+
+        localFiles.put(runnableName,
+                       new DefaultLocalFile(localFile.getName(), location.toURI(), location.lastModified(),
+                                            location.length(), localFile.isArchive(), localFile.getPattern()));
+      }
+    }
+    LOG.debug("Done Runnable LocalFiles");
+  }
+
+  private void saveSpecification(TwillSpecification spec, final Multimap<String, LocalFile> runnableLocalFiles,
+                                 Map<String, LocalFile> localFiles) throws IOException {
+    // Rewrite LocalFiles inside twillSpec
+    Map<String, RuntimeSpecification> runtimeSpec = Maps.transformEntries(
+      spec.getRunnables(), new Maps.EntryTransformer<String, RuntimeSpecification, RuntimeSpecification>() {
+      @Override
+      public RuntimeSpecification transformEntry(String key, RuntimeSpecification value) {
+        return new DefaultRuntimeSpecification(value.getName(), value.getRunnableSpecification(),
+                                               value.getResourceSpecification(), runnableLocalFiles.get(key));
+      }
+    });
+
+    // Serialize into a local temp file.
+    LOG.debug("Create and copy {}", Constants.Files.TWILL_SPEC);
+    Location location = createTempLocation(Constants.Files.TWILL_SPEC);
+    Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+    try {
+      EventHandlerSpecification eventHandler = spec.getEventHandler();
+      if (eventHandler == null) {
+        eventHandler = new LogOnlyEventHandler().configure();
+      }
+
+      TwillSpecificationAdapter.create().toJson(
+        new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), eventHandler),
+        writer);
+    } finally {
+      writer.close();
+    }
+    LOG.debug("Done {}", Constants.Files.TWILL_SPEC);
+
+    localFiles.put(Constants.Files.TWILL_SPEC, createLocalFile(Constants.Files.TWILL_SPEC, location));
+  }
+
+  private void saveLogback(Map<String, LocalFile> localFiles) throws IOException {
+    LOG.debug("Create and copy {}", Constants.Files.LOGBACK_TEMPLATE);
+    Location location = copyFromURL(getClass().getClassLoader().getResource(Constants.Files.LOGBACK_TEMPLATE),
+                                    createTempLocation(Constants.Files.LOGBACK_TEMPLATE));
+    LOG.debug("Done {}", Constants.Files.LOGBACK_TEMPLATE);
+
+    localFiles.put(Constants.Files.LOGBACK_TEMPLATE, createLocalFile(Constants.Files.LOGBACK_TEMPLATE, location));
+  }
+
+  /**
+   * Creates the launcher.jar for launch the main application.
+   */
+  private void saveLauncher(Map<String, LocalFile> localFiles) throws URISyntaxException, IOException {
+
+    LOG.debug("Create and copy {}", Constants.Files.LAUNCHER_JAR);
+    Location location = createTempLocation(Constants.Files.LAUNCHER_JAR);
+
+    final String launcherName = TwillLauncher.class.getName();
+
+    // Create a jar file with the TwillLauncher optionally a json serialized classpath.json in it.
+    final JarOutputStream jarOut = new JarOutputStream(location.getOutputStream());
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = getClass().getClassLoader();
+    }
+    Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() {
+      @Override
+      public boolean accept(String className, URL classUrl, URL classPathUrl) {
+        Preconditions.checkArgument(className.startsWith(launcherName),
+                                    "Launcher jar should not have dependencies: %s", className);
+        try {
+          jarOut.putNextEntry(new JarEntry(className.replace('.', '/') + ".class"));
+          InputStream is = classUrl.openStream();
+          try {
+            ByteStreams.copy(is, jarOut);
+          } finally {
+            is.close();
+          }
+        } catch (IOException e) {
+          throw Throwables.propagate(e);
+        }
+        return true;
+      }
+    }, TwillLauncher.class.getName());
+
+    try {
+      if (!classPaths.isEmpty()) {
+        jarOut.putNextEntry(new JarEntry("classpath"));
+        jarOut.write(Joiner.on(':').join(classPaths).getBytes(Charsets.UTF_8));
+      }
+    } finally {
+      jarOut.close();
+    }
+    LOG.debug("Done {}", Constants.Files.LAUNCHER_JAR);
+
+    localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location));
+  }
+
+  private void saveKafka(Map<String, LocalFile> localFiles) throws IOException {
+    LOG.debug("Copy {}", Constants.Files.KAFKA);
+    Location location = copyFromURL(getClass().getClassLoader().getResource(KAFKA_ARCHIVE),
+                                    createTempLocation(Constants.Files.KAFKA));
+    LOG.debug("Done {}", Constants.Files.KAFKA);
+
+    localFiles.put(Constants.Files.KAFKA, createLocalFile(Constants.Files.KAFKA, location, true));
+  }
+
+  private void saveVmOptions(String opts, Map<String, LocalFile> localFiles) throws IOException {
+    if (opts.isEmpty()) {
+      // If no vm options, no need to localize the file.
+      return;
+    }
+    LOG.debug("Copy {}", Constants.Files.JVM_OPTIONS);
+    final Location location = createTempLocation(Constants.Files.JVM_OPTIONS);
+    CharStreams.write(opts, new OutputSupplier<Writer>() {
+      @Override
+      public Writer getOutput() throws IOException {
+        return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+      }
+    });
+    LOG.debug("Done {}", Constants.Files.JVM_OPTIONS);
+
+    localFiles.put(Constants.Files.JVM_OPTIONS, createLocalFile(Constants.Files.JVM_OPTIONS, location));
+  }
+
+  private void saveArguments(Arguments arguments, Map<String, LocalFile> localFiles) throws IOException {
+    LOG.debug("Create and copy {}", Constants.Files.ARGUMENTS);
+    final Location location = createTempLocation(Constants.Files.ARGUMENTS);
+    ArgumentsCodec.encode(arguments, new OutputSupplier<Writer>() {
+      @Override
+      public Writer getOutput() throws IOException {
+        return new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+      }
+    });
+    LOG.debug("Done {}", Constants.Files.ARGUMENTS);
+
+    localFiles.put(Constants.Files.ARGUMENTS, createLocalFile(Constants.Files.ARGUMENTS, location));
+  }
+
+  /**
+   * Serializes the list of files that needs to localize from AM to Container.
+   */
+  private void saveLocalFiles(Map<String, LocalFile> localFiles, Set<String> includes) throws IOException {
+    Map<String, LocalFile> localize = ImmutableMap.copyOf(Maps.filterKeys(localFiles, Predicates.in(includes)));
+    LOG.debug("Create and copy {}", Constants.Files.LOCALIZE_FILES);
+    Location location = createTempLocation(Constants.Files.LOCALIZE_FILES);
+    Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8);
+    try {
+      new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+        .create().toJson(localize.values(), new TypeToken<List<LocalFile>>() {
+      }.getType(), writer);
+    } finally {
+      writer.close();
+    }
+    LOG.debug("Done {}", Constants.Files.LOCALIZE_FILES);
+    localFiles.put(Constants.Files.LOCALIZE_FILES, createLocalFile(Constants.Files.LOCALIZE_FILES, location));
+  }
+
+  private Location copyFromURL(URL url, Location target) throws IOException {
+    InputStream is = url.openStream();
+    try {
+      OutputStream os = new BufferedOutputStream(target.getOutputStream());
+      try {
+        ByteStreams.copy(is, os);
+      } finally {
+        os.close();
+      }
+    } finally {
+      is.close();
+    }
+    return target;
+  }
+
+  private Location createTempLocation(String fileName) {
+    String name;
+    String suffix = Paths.getExtension(fileName);
+
+    name = fileName.substring(0, fileName.length() - suffix.length() - 1);
+
+    try {
+      return getAppLocation().append(name).getTempFile('.' + suffix);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private Location getAppLocation() {
+    return locationFactory.create(String.format("/%s/%s", twillSpec.getName(), runId.getId()));
+  }
+
+  /**
+   * Returns the context ClassLoader if there is any, otherwise, returns ClassLoader of this class.
+   */
+  private ClassLoader getClassLoader() {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    return classLoader == null ? getClass().getClassLoader() : classLoader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
new file mode 100644
index 0000000..9335465
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.SecureStore;
+import org.apache.twill.api.SecureStoreUpdater;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillPreparer;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.HDFSLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.SingleRunnableApplication;
+import org.apache.twill.internal.appmaster.ApplicationMasterLiveNodeData;
+import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
+import org.apache.twill.internal.yarn.YarnAppClient;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Callables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An implementation of {@link org.apache.twill.api.TwillRunnerService} that runs application on a YARN cluster.
+ */
+public final class YarnTwillRunnerService extends AbstractIdleService implements TwillRunnerService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(YarnTwillRunnerService.class);
+
+  private static final int ZK_TIMEOUT = 10000;
+  private static final Function<String, RunId> STRING_TO_RUN_ID = new Function<String, RunId>() {
+    @Override
+    public RunId apply(String input) {
+      return RunIds.fromString(input);
+    }
+  };
+  private static final Function<YarnTwillController, TwillController> CAST_CONTROLLER =
+    new Function<YarnTwillController, TwillController>() {
+    @Override
+    public TwillController apply(YarnTwillController controller) {
+      return controller;
+    }
+  };
+
+  private final YarnConfiguration yarnConfig;
+  private final YarnAppClient yarnAppClient;
+  private final ZKClientService zkClientService;
+  private final LocationFactory locationFactory;
+  private final Table<String, RunId, YarnTwillController> controllers;
+  private ScheduledExecutorService secureStoreScheduler;
+
+  private Iterable<LiveInfo> liveInfos;
+  private Cancellable watchCancellable;
+  private volatile String jvmOptions = "";
+
+  public YarnTwillRunnerService(YarnConfiguration config, String zkConnect) {
+    this(config, zkConnect, new HDFSLocationFactory(getFileSystem(config), "/twill"));
+  }
+
+  public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
+    this.yarnConfig = config;
+    this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
+    this.locationFactory = locationFactory;
+    this.zkClientService = getZKClientService(zkConnect);
+    this.controllers = HashBasedTable.create();
+  }
+
+  /**
+   * This methods sets the extra JVM options that will be passed to the java command line for every application
+   * started through this {@link YarnTwillRunnerService} instance. It only affects applications that are started
+   * after options is set.
+   *
+   * This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid
+   * options could cause application not able to start.
+   *
+   * @param options extra JVM options.
+   */
+  public void setJVMOptions(String options) {
+    Preconditions.checkArgument(options != null, "JVM options cannot be null.");
+    this.jvmOptions = options;
+  }
+
+  @Override
+  public Cancellable scheduleSecureStoreUpdate(final SecureStoreUpdater updater,
+                                               long initialDelay, long delay, TimeUnit unit) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return new Cancellable() {
+        @Override
+        public void cancel() {
+          // No-op
+        }
+      };
+    }
+
+    synchronized (this) {
+      if (secureStoreScheduler == null) {
+        secureStoreScheduler = Executors.newSingleThreadScheduledExecutor(
+          Threads.createDaemonThreadFactory("secure-store-updater"));
+      }
+    }
+
+    final ScheduledFuture<?> future = secureStoreScheduler.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        // Collects all <application, runId> pairs first
+        Multimap<String, RunId> liveApps = HashMultimap.create();
+        synchronized (YarnTwillRunnerService.this) {
+          for (Table.Cell<String, RunId, YarnTwillController> cell : controllers.cellSet()) {
+            liveApps.put(cell.getRowKey(), cell.getColumnKey());
+          }
+        }
+
+        // Collect all secure stores that needs to be updated.
+        Table<String, RunId, SecureStore> secureStores = HashBasedTable.create();
+        for (Map.Entry<String, RunId> entry : liveApps.entries()) {
+          try {
+            secureStores.put(entry.getKey(), entry.getValue(), updater.update(entry.getKey(), entry.getValue()));
+          } catch (Throwable t) {
+            LOG.warn("Exception thrown by SecureStoreUpdater {}", updater, t);
+          }
+        }
+
+        // Update secure stores.
+        updateSecureStores(secureStores);
+      }
+    }, initialDelay, delay, unit);
+
+    return new Cancellable() {
+      @Override
+      public void cancel() {
+        future.cancel(false);
+      }
+    };
+  }
+
+  @Override
+  public TwillPreparer prepare(TwillRunnable runnable) {
+    return prepare(runnable, ResourceSpecification.BASIC);
+  }
+
+  @Override
+  public TwillPreparer prepare(TwillRunnable runnable, ResourceSpecification resourceSpecification) {
+    return prepare(new SingleRunnableApplication(runnable, resourceSpecification));
+  }
+
+  @Override
+  public TwillPreparer prepare(TwillApplication application) {
+    Preconditions.checkState(isRunning(), "Service not start. Please call start() first.");
+    final TwillSpecification twillSpec = application.configure();
+    final String appName = twillSpec.getName();
+
+    return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory,
+                                 Suppliers.ofInstance(jvmOptions),
+                                 new YarnTwillControllerFactory() {
+      @Override
+      public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
+                                        Callable<ProcessController<YarnApplicationReport>> startUp) {
+        ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
+        YarnTwillController controller = listenController(new YarnTwillController(runId, zkClient,
+                                                                                  logHandlers, startUp));
+        synchronized (YarnTwillRunnerService.this) {
+          Preconditions.checkArgument(!controllers.contains(appName, runId),
+                                      "Application %s with runId %s is already running.", appName, runId);
+          controllers.put(appName, runId, controller);
+        }
+        return controller;
+      }
+    });
+  }
+
+  @Override
+  public synchronized TwillController lookup(String applicationName, final RunId runId) {
+    return controllers.get(applicationName, runId);
+  }
+
+  @Override
+  public Iterable<TwillController> lookup(final String applicationName) {
+    return new Iterable<TwillController>() {
+      @Override
+      public Iterator<TwillController> iterator() {
+        synchronized (YarnTwillRunnerService.this) {
+          return Iterators.transform(ImmutableList.copyOf(controllers.row(applicationName).values()).iterator(),
+                                     CAST_CONTROLLER);
+        }
+      }
+    };
+  }
+
+  @Override
+  public Iterable<LiveInfo> lookupLive() {
+    return liveInfos;
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    yarnAppClient.startAndWait();
+    zkClientService.startAndWait();
+
+    // Create the root node, so that the namespace root would get created if it is missing
+    // If the exception is caused by node exists, then it's ok. Otherwise propagate the exception.
+    ZKOperations.ignoreError(zkClientService.create("/", null, CreateMode.PERSISTENT),
+                             KeeperException.NodeExistsException.class, null).get();
+
+    watchCancellable = watchLiveApps();
+    liveInfos = createLiveInfos();
+
+    // Schedule an updater for updating HDFS delegation tokens
+    if (UserGroupInformation.isSecurityEnabled()) {
+      long delay = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+                                      DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+      scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
+                                delay, delay, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // Shutdown shouldn't stop any controllers, as stopping this client service should let the remote containers
+    // running. However, this assumes that this TwillRunnerService is a long running service and you only stop it
+    // when the JVM process is about to exit. Hence it is important that threads created in the controllers are
+    // daemon threads.
+    synchronized (this) {
+      if (secureStoreScheduler != null) {
+        secureStoreScheduler.shutdownNow();
+      }
+    }
+    watchCancellable.cancel();
+    zkClientService.stopAndWait();
+    yarnAppClient.stopAndWait();
+  }
+
+  private Cancellable watchLiveApps() {
+    final Map<String, Cancellable> watched = Maps.newConcurrentMap();
+
+    final AtomicBoolean cancelled = new AtomicBoolean(false);
+    // Watch child changes in the root, which gives all application names.
+    final Cancellable cancellable = ZKOperations.watchChildren(zkClientService, "/",
+                                                               new ZKOperations.ChildrenCallback() {
+      @Override
+      public void updated(NodeChildren nodeChildren) {
+        if (cancelled.get()) {
+          return;
+        }
+
+        Set<String> apps = ImmutableSet.copyOf(nodeChildren.getChildren());
+
+        // For each for the application name, watch for ephemeral nodes under /instances.
+        for (final String appName : apps) {
+          if (watched.containsKey(appName)) {
+            continue;
+          }
+
+          final String instancePath = String.format("/%s/instances", appName);
+          watched.put(appName,
+                      ZKOperations.watchChildren(zkClientService, instancePath, new ZKOperations.ChildrenCallback() {
+            @Override
+            public void updated(NodeChildren nodeChildren) {
+              if (cancelled.get()) {
+                return;
+              }
+              if (nodeChildren.getChildren().isEmpty()) {     // No more child, means no live instances
+                Cancellable removed = watched.remove(appName);
+                if (removed != null) {
+                  removed.cancel();
+                }
+                return;
+              }
+              synchronized (YarnTwillRunnerService.this) {
+                // For each of the children, which the node name is the runId,
+                // fetch the application Id and construct TwillController.
+                for (final RunId runId : Iterables.transform(nodeChildren.getChildren(), STRING_TO_RUN_ID)) {
+                  if (controllers.contains(appName, runId)) {
+                    continue;
+                  }
+                  updateController(appName, runId, cancelled);
+                }
+              }
+            }
+          }));
+        }
+
+        // Remove app watches for apps that are gone. Removal of controller from controllers table is done
+        // in the state listener attached to the twill controller.
+        for (String removeApp : Sets.difference(watched.keySet(), apps)) {
+          watched.remove(removeApp).cancel();
+        }
+      }
+    });
+    return new Cancellable() {
+      @Override
+      public void cancel() {
+        cancelled.set(true);
+        cancellable.cancel();
+        for (Cancellable c : watched.values()) {
+          c.cancel();
+        }
+      }
+    };
+  }
+
+  private YarnTwillController listenController(final YarnTwillController controller) {
+    controller.addListener(new ServiceListenerAdapter() {
+      @Override
+      public void terminated(State from) {
+        removeController();
+      }
+
+      @Override
+      public void failed(State from, Throwable failure) {
+        removeController();
+      }
+
+      private void removeController() {
+        synchronized (YarnTwillRunnerService.this) {
+          Iterables.removeIf(controllers.values(),
+                             new Predicate<TwillController>() {
+             @Override
+             public boolean apply(TwillController input) {
+               return input == controller;
+             }
+           });
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+    return controller;
+  }
+
+  private ZKClientService getZKClientService(String zkConnect) {
+    return ZKClientServices.delegate(
+      ZKClients.reWatchOnExpire(
+        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnect)
+                                   .setSessionTimeout(ZK_TIMEOUT)
+                                   .build(), RetryStrategies.exponentialDelay(100, 2000, TimeUnit.MILLISECONDS))));
+  }
+
+  private Iterable<LiveInfo> createLiveInfos() {
+    return new Iterable<LiveInfo>() {
+
+      @Override
+      public Iterator<LiveInfo> iterator() {
+        Map<String, Map<RunId, YarnTwillController>> controllerMap = ImmutableTable.copyOf(controllers).rowMap();
+        return Iterators.transform(controllerMap.entrySet().iterator(),
+                                   new Function<Map.Entry<String, Map<RunId, YarnTwillController>>, LiveInfo>() {
+          @Override
+          public LiveInfo apply(final Map.Entry<String, Map<RunId, YarnTwillController>> entry) {
+            return new LiveInfo() {
+              @Override
+              public String getApplicationName() {
+                return entry.getKey();
+              }
+
+              @Override
+              public Iterable<TwillController> getControllers() {
+                return Iterables.transform(entry.getValue().values(), CAST_CONTROLLER);
+              }
+            };
+          }
+        });
+      }
+    };
+  }
+
+  private void updateController(final String appName, final RunId runId, final AtomicBoolean cancelled) {
+    String instancePath = String.format("/%s/instances/%s", appName, runId.getId());
+
+    // Fetch the content node.
+    Futures.addCallback(zkClientService.getData(instancePath), new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        if (cancelled.get()) {
+          return;
+        }
+        ApplicationId appId = getApplicationId(result);
+        if (appId == null) {
+          return;
+        }
+
+        synchronized (YarnTwillRunnerService.this) {
+          if (!controllers.contains(appName, runId)) {
+            ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName);
+            YarnTwillController controller = listenController(
+              new YarnTwillController(runId, zkClient,
+                                      Callables.returning(yarnAppClient.createProcessController(appId))));
+            controllers.put(appName, runId, controller);
+            controller.start();
+          }
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.warn("Failed in fetching application instance node.", t);
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+
+  /**
+   * Decodes application ID stored inside the node data.
+   * @param nodeData The node data to decode from. If it is {@code null}, this method would return {@code null}.
+   * @return The ApplicationId or {@code null} if failed to decode.
+   */
+  private ApplicationId getApplicationId(NodeData nodeData) {
+    byte[] data = nodeData == null ? null : nodeData.getData();
+    if (data == null) {
+      return null;
+    }
+
+    Gson gson = new Gson();
+    JsonElement json = gson.fromJson(new String(data, Charsets.UTF_8), JsonElement.class);
+    if (!json.isJsonObject()) {
+      LOG.warn("Unable to decode live data node.");
+      return null;
+    }
+
+    JsonObject jsonObj = json.getAsJsonObject();
+    json = jsonObj.get("data");
+    if (!json.isJsonObject()) {
+      LOG.warn("Property data not found in live data node.");
+      return null;
+    }
+
+    try {
+      ApplicationMasterLiveNodeData amLiveNode = gson.fromJson(json, ApplicationMasterLiveNodeData.class);
+      return YarnUtils.createApplicationId(amLiveNode.getAppIdClusterTime(), amLiveNode.getAppId());
+    } catch (Exception e) {
+      LOG.warn("Failed to decode application live node data.", e);
+      return null;
+    }
+  }
+
+  private void updateSecureStores(Table<String, RunId, SecureStore> secureStores) {
+    for (Table.Cell<String, RunId, SecureStore> cell : secureStores.cellSet()) {
+      Object store = cell.getValue().getStore();
+      if (!(store instanceof Credentials)) {
+        LOG.warn("Only Hadoop Credentials is supported. Ignore update for {}.", cell);
+        continue;
+      }
+
+      Credentials credentials = (Credentials) store;
+      if (credentials.getAllTokens().isEmpty()) {
+        // Nothing to update.
+        continue;
+      }
+
+      try {
+        updateCredentials(cell.getRowKey(), cell.getColumnKey(), credentials);
+        synchronized (YarnTwillRunnerService.this) {
+          // Notify the application for secure store updates if it is still running.
+          YarnTwillController controller = controllers.get(cell.getRowKey(), cell.getColumnKey());
+          if (controller != null) {
+            controller.secureStoreUpdated();
+          }
+        }
+      } catch (Throwable t) {
+        LOG.warn("Failed to update secure store for {}.", cell, t);
+      }
+    }
+  }
+
+  private void updateCredentials(String application, RunId runId, Credentials updates) throws IOException {
+    Location credentialsLocation = locationFactory.create(String.format("/%s/%s/%s", application, runId.getId(),
+                                                                        Constants.Files.CREDENTIALS));
+    // Try to read the old credentials.
+    Credentials credentials = new Credentials();
+    if (credentialsLocation.exists()) {
+      DataInputStream is = new DataInputStream(new BufferedInputStream(credentialsLocation.getInputStream()));
+      try {
+        credentials.readTokenStorageStream(is);
+      } finally {
+        is.close();
+      }
+    }
+
+    // Overwrite with the updates.
+    credentials.addAll(updates);
+
+    // Overwrite the credentials.
+    Location tmpLocation = credentialsLocation.getTempFile(Constants.Files.CREDENTIALS);
+
+    // Save the credentials store with user-only permission.
+    DataOutputStream os = new DataOutputStream(new BufferedOutputStream(tmpLocation.getOutputStream("600")));
+    try {
+      credentials.writeTokenStorageToStream(os);
+    } finally {
+      os.close();
+    }
+
+    // Rename the tmp file into the credentials location
+    tmpLocation.renameTo(credentialsLocation);
+
+    LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation.toURI());
+  }
+
+  private static FileSystem getFileSystem(YarnConfiguration configuration) {
+    try {
+      return FileSystem.get(configuration);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/java/org/apache/twill/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/package-info.java b/twill-yarn/src/main/java/org/apache/twill/yarn/package-info.java
new file mode 100644
index 0000000..b3cbc5e
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes in this package implement the Twill API for Apache Hadoop YARN.
+ */
+package org.apache.twill.yarn;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/resources/logback-template.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/resources/logback-template.xml b/twill-yarn/src/main/resources/logback-template.xml
new file mode 100644
index 0000000..38cf6c8
--- /dev/null
+++ b/twill-yarn/src/main/resources/logback-template.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+
+    <logger name="org.apache.hadoop" level="WARN" />
+    <logger name="org.apache.zookeeper" level="WARN" />
+
+    <root level="INFO" />
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
new file mode 100644
index 0000000..bb1a583
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BuggyServer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Server for testing that will die if you give it a 0.
+ */
+public final class BuggyServer extends SocketServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BuggyServer.class);
+
+  @Override
+  public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+    String line = reader.readLine();
+    LOG.info("Received: " + line + " going to divide by it");
+    Integer toDivide = Integer.valueOf(line);
+    writer.println(Integer.toString(100 / toDivide));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
new file mode 100644
index 0000000..1054ec9
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This test is executed by {@link YarnTestSuite}.
+ */
+public class DistributeShellTestRun {
+
+  @Ignore
+  @Test
+  public void testDistributedShell() throws InterruptedException {
+    TwillRunner twillRunner = YarnTestSuite.getTwillRunner();
+
+    TwillController controller = twillRunner.prepare(new DistributedShell("pwd", "ls -al"))
+                                            .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
+                                            .start();
+
+    final CountDownLatch stopLatch = new CountDownLatch(1);
+    controller.addListener(new ServiceListenerAdapter() {
+
+      @Override
+      public void terminated(Service.State from) {
+        stopLatch.countDown();
+      }
+
+      @Override
+      public void failed(Service.State from, Throwable failure) {
+        stopLatch.countDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Assert.assertTrue(stopLatch.await(10, TimeUnit.SECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
new file mode 100644
index 0000000..c89371c
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributedShell.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ *
+ */
+public final class DistributedShell extends AbstractTwillRunnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedShell.class);
+
+  public DistributedShell(String...commands) {
+    super(ImmutableMap.of("cmds", Joiner.on(';').join(commands)));
+  }
+
+  @Override
+  public void run() {
+    for (String cmd : Splitter.on(';').split(getArgument("cmds"))) {
+      try {
+        Process process = new ProcessBuilder(ImmutableList.copyOf(Splitter.on(' ').split(cmd)))
+                              .redirectErrorStream(true).start();
+        BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charsets.US_ASCII));
+        try {
+          String line = reader.readLine();
+          while (line != null) {
+            LOG.info(line);
+            line = reader.readLine();
+          }
+        } finally {
+          reader.close();
+        }
+      } catch (IOException e) {
+        LOG.error("Fail to execute command " + cmd, e);
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    // No-op
+  }
+}