You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:30 UTC

[07/15] Initial import commit.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
new file mode 100644
index 0000000..c1bd75a
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Ported from Apache Hadoop YARN.
+ */
+public final class AMRMClientImpl extends AbstractService implements AMRMClient {
+
+  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+
+  private final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  private int lastResponseId = 0;
+
+  protected AMRMProtocol rmClient;
+  protected final ApplicationAttemptId appAttemptId;
+  protected Resource clusterAvailableResources;
+  protected int clusterNodeCount;
+
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceRequest
+  protected final
+  Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+    remoteRequestsTable =
+    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+    new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
+  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+
+  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+    super(AMRMClientImpl.class.getName());
+    this.appAttemptId = appAttemptId;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    final YarnConfiguration conf = new YarnConfiguration(getConfig());
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+      YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+        ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, rmAddress);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
+                                           conf);
+      }
+    });
+    LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (this.rmClient != null) {
+      RPC.stopProxy(this.rmClient);
+    }
+    super.stop();
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+    String appHostName, int appHostPort, String appTrackingUrl)
+    throws YarnRemoteException {
+    // do this only once ???
+    RegisterApplicationMasterRequest request = recordFactory
+      .newRecordInstance(RegisterApplicationMasterRequest.class);
+    synchronized (this) {
+      request.setApplicationAttemptId(appAttemptId);
+    }
+    request.setHost(appHostName);
+    request.setRpcPort(appHostPort);
+    if (appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    RegisterApplicationMasterResponse response = rmClient
+      .registerApplicationMaster(request);
+    return response;
+  }
+
+  @Override
+  public AllocationResponse allocate(float progressIndicator)
+    throws YarnRemoteException {
+    AllocateResponse allocateResponse = null;
+    ArrayList<ResourceRequest> askList = null;
+    ArrayList<ContainerId> releaseList = null;
+    AllocateRequest allocateRequest = null;
+
+    try {
+      synchronized (this) {
+        askList = new ArrayList<ResourceRequest>(ask);
+        releaseList = new ArrayList<ContainerId>(release);
+        // optimistically clear this collection assuming no RPC failure
+        ask.clear();
+        release.clear();
+        allocateRequest = BuilderUtils
+          .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
+                              askList, releaseList);
+      }
+
+      allocateResponse = rmClient.allocate(allocateRequest);
+      AllocationResponse response = AllocationResponses.create(allocateResponse);
+
+      synchronized (this) {
+        // update these on successful RPC
+        clusterNodeCount = allocateResponse.getNumClusterNodes();
+        lastResponseId = response.getResponseId();
+        clusterAvailableResources = response.getAvailableResources();
+      }
+
+      return response;
+    } finally {
+      // TODO how to differentiate remote YARN exception vs error in RPC
+      if (allocateResponse == null) {
+        // We hit an exception in allocate()
+        // Preserve ask and release for next call to allocate()
+        synchronized (this) {
+          release.addAll(releaseList);
+          // Requests could have been added or deleted during call to allocate.
+          // If requests were added/removed then there is nothing to do since
+          // the ResourceRequest object in ask would have the actual new value.
+          // If ask does not have this ResourceRequest then it was unchanged and
+          // so we can add the value back safely.
+          // This assumes that there will no concurrent calls to allocate() and
+          // so we don't have to worry about ask being changed in the
+          // synchronized block at the beginning of this method.
+          for (ResourceRequest oldAsk : askList) {
+            if (!ask.contains(oldAsk)) {
+              ask.add(oldAsk);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                          String appMessage, String appTrackingUrl) throws YarnRemoteException {
+    FinishApplicationMasterRequest request = recordFactory
+      .newRecordInstance(FinishApplicationMasterRequest.class);
+    request.setAppAttemptId(appAttemptId);
+    request.setFinishApplicationStatus(appStatus);
+    if (appMessage != null) {
+      request.setDiagnostics(appMessage);
+    }
+    if (appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    rmClient.finishApplicationMaster(request);
+  }
+
+  @Override
+  public synchronized void addContainerRequest(ContainerRequest req) {
+    // Create resource requests
+    if (req.hosts != null) {
+      for (String host : req.hosts) {
+        addResourceRequest(req.priority, host, req.capability, req.containerCount);
+      }
+    }
+
+    if (req.racks != null) {
+      for (String rack : req.racks) {
+        addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+
+    // Off switch
+    addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+  }
+
+  @Override
+  public synchronized void removeContainerRequest(ContainerRequest req) {
+    // Update resource requests
+    if (req.hosts != null) {
+      for (String hostName : req.hosts) {
+        decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+      }
+    }
+
+    if (req.racks != null) {
+      for (String rack : req.racks) {
+        decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+
+    decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+  }
+
+  @Override
+  public synchronized void releaseAssignedContainer(ContainerId containerId) {
+    release.add(containerId);
+  }
+
+  @Override
+  public synchronized Resource getClusterAvailableResources() {
+    return clusterAvailableResources;
+  }
+
+  @Override
+  public synchronized int getClusterNodeCount() {
+    return clusterNodeCount;
+  }
+
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // This code looks weird but is needed because of the following scenario.
+    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
+    // request is added to 'ask' to notify the RM about not needing it any more.
+    // Before the call to allocate, the user now requests more containers. If 
+    // the locations of the 0 size request and the new request are the same
+    // (with the difference being only container count), then the set comparator
+    // will consider both to be the same and not add the new request to ask. So 
+    // we need to check for the "same" request being present and remove it and 
+    // then add it back. The comparator is container count agnostic.
+    // This should happen only rarely but we do need to guard against it.
+    if (ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+                                  Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequest>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+    if (remoteRequest == null) {
+      remoteRequest = BuilderUtils.
+        newResourceRequest(priority, resourceName, capability, 0);
+      reqMap.put(capability, remoteRequest);
+    }
+
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+
+    // Note this down for next interaction with ResourceManager
+    addResourceRequestToAsk(remoteRequest);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+                  + appAttemptId + " priority=" + priority.getPriority()
+                  + " resourceName=" + resourceName + " numContainers="
+                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+  private void decResourceRequest(Priority priority, String resourceName,
+                                  Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+
+    if (remoteRequests == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as priority " + priority
+                    + " is not present in request table");
+      }
+      return;
+    }
+
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+                    + " is not present in request table");
+      }
+      return;
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+                  + appAttemptId + " priority=" + priority.getPriority()
+                  + " resourceName=" + resourceName + " numContainers="
+                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+
+    remoteRequest.
+      setNumContainers(remoteRequest.getNumContainers() - containerCount);
+    if (remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      remoteRequest.setNumContainers(0);
+    }
+    // Send the ResourceRequest to RM even if is 0 because it needs to override
+    // a previously sent value. If ResourceRequest was not sent previously then
+    // sending 0 ought to be a no-op on RM.
+    addResourceRequestToAsk(remoteRequest);
+
+    // Delete entries from map if no longer needed.
+    if (remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+                 + appAttemptId + " priority=" + priority.getPriority()
+                 + " resourceName=" + resourceName + " numContainers="
+                 + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
new file mode 100644
index 0000000..89734fc
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.List;
+
+/**
+ * This interface is to abstract the differences in Vanilla Hadoop YARN 2.0 and CDH 4.4
+ */
+public interface AllocationResponse {
+
+  int getResponseId();
+
+  Resource getAvailableResources();
+
+  List<Container> getAllocatedContainers();
+
+  List<ContainerStatus> getCompletedContainersStatuses();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
new file mode 100644
index 0000000..ea46c3b
--- /dev/null
+++ b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import com.google.common.base.Throwables;
+import com.google.common.reflect.TypeToken;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.List;
+
+/**
+ * Factory for building instance of {@link AllocationResponse} based on the response type.
+ */
+public final class AllocationResponses {
+
+  /**
+   * A hack for CDH 4.4.0, as the AllocateResponse class is being rewritten and diverted from YARN 2.0
+   */
+  private static final boolean IS_CDH_4_4;
+
+  static {
+    boolean result = false;
+    try {
+      try {
+        // See if it is standard YARN 2.0 AllocateResponse object.
+        AllocateResponse.class.getMethod("getAMResponse");
+      } catch (NoSuchMethodException e) {
+          // See if it is CDH 4.4 AllocateResponse object.
+        AllocationResponse.class.getMethod("getAllocatedContainers");
+        result = true;
+      }
+    } catch (Exception e) {
+      // Something very wrong in here, as it shouldn't arrive here.
+      e.printStackTrace();
+      throw Throwables.propagate(e);
+    }
+
+    IS_CDH_4_4 = result;
+  }
+
+  public static AllocationResponse create(Object response) {
+    if (IS_CDH_4_4) {
+      return new ReflectionAllocationResponse(response);
+    }
+
+    try {
+      Object amResponse = response.getClass().getMethod("getAMResponse").invoke(response);
+      return new ReflectionAllocationResponse(amResponse);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private static final class ReflectionAllocationResponse implements AllocationResponse {
+
+    private final Object response;
+
+    private ReflectionAllocationResponse(Object response) {
+      this.response = response;
+    }
+
+    @Override
+    public int getResponseId() {
+      return call("getResponseId", TypeToken.of(Integer.class));
+    }
+
+    @Override
+    public Resource getAvailableResources() {
+      return call("getAvailableResources", TypeToken.of(Resource.class));
+    }
+
+    @Override
+    public List<Container> getAllocatedContainers() {
+      return call("getAllocatedContainers", new TypeToken<List<Container>>() {});
+    }
+
+    @Override
+    public List<ContainerStatus> getCompletedContainersStatuses() {
+      return call("getCompletedContainersStatuses", new TypeToken<List<ContainerStatus>>() {});
+    }
+
+    private <T> T call(String methodName, TypeToken<T> resultType) {
+      try {
+        return (T) resultType.getRawType().cast(response.getClass().getMethod(methodName).invoke(response));
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+
+  private AllocationResponses() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
new file mode 100644
index 0000000..ce8f90f
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop21YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
+
+  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+  static {
+    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+      @Override
+      public YarnContainerStatus apply(ContainerStatus status) {
+        return new Hadoop21YarnContainerStatus(status);
+      }
+    };
+  }
+
+  private final ContainerId containerId;
+  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+  private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
+  private final Hadoop21YarnNMClient nmClient;
+  private InetSocketAddress trackerAddr;
+  private URL trackerUrl;
+  private Resource maxCapability;
+
+  public Hadoop21YarnAMClient(Configuration conf) {
+    String masterContainerId = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
+    Preconditions.checkArgument(masterContainerId != null,
+                                "Missing %s from environment", ApplicationConstants.Environment.CONTAINER_ID.name());
+    this.containerId = ConverterUtils.toContainerId(masterContainerId);
+    this.containerRequests = ArrayListMultimap.create();
+
+    this.amrmClient = AMRMClient.createAMRMClient();
+    this.amrmClient.init(conf);
+    this.nmClient = new Hadoop21YarnNMClient(conf);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+    amrmClient.start();
+    RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+                                                                                      trackerAddr.getPort(),
+                                                                                      trackerUrl.toString());
+    maxCapability = response.getMaximumResourceCapability();
+    nmClient.startAndWait();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    nmClient.stopAndWait();
+    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+    amrmClient.stop();
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  @Override
+  public String getHost() {
+    return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
+  }
+
+  @Override
+  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+    this.trackerAddr = trackerAddr;
+    this.trackerUrl = trackerUrl;
+  }
+
+  @Override
+  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+    AllocateResponse allocateResponse = amrmClient.allocate(progress);
+    List<ProcessLauncher<YarnContainerInfo>> launchers
+      = Lists.newArrayListWithCapacity(allocateResponse.getAllocatedContainers().size());
+
+    for (Container container : allocateResponse.getAllocatedContainers()) {
+      launchers.add(new RunnableProcessLauncher(new Hadoop21YarnContainerInfo(container), nmClient));
+    }
+
+    if (!launchers.isEmpty()) {
+      handler.acquired(launchers);
+
+      // If no process has been launched through the given launcher, return the container.
+      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+        // This cast always works.
+        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+        if (!launcher.isLaunched()) {
+          Container container = launcher.getContainerInfo().getContainer();
+          LOG.info("Nothing to run in container, releasing it: {}", container);
+          amrmClient.releaseAssignedContainer(container.getId());
+        }
+      }
+    }
+
+    List<YarnContainerStatus> completed = ImmutableList.copyOf(
+      Iterables.transform(allocateResponse.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+    if (!completed.isEmpty()) {
+      handler.completed(completed);
+    }
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability) {
+    return addContainerRequest(capability, 1);
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+    return new ContainerRequestBuilder(adjustCapability(capability), count) {
+      @Override
+      public String apply() {
+        synchronized (Hadoop21YarnAMClient.this) {
+          String id = UUID.randomUUID().toString();
+
+          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+          for (int i = 0; i < count; i++) {
+            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
+            containerRequests.put(id, request);
+            amrmClient.addContainerRequest(request);
+          }
+
+          return id;
+        }
+      }
+    };
+  }
+
+  @Override
+  public synchronized void completeContainerRequest(String id) {
+    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+      amrmClient.removeContainerRequest(request);
+    }
+  }
+
+  private Resource adjustCapability(Resource resource) {
+    int cores = resource.getVirtualCores();
+    int updatedCores = Math.min(resource.getVirtualCores(), maxCapability.getVirtualCores());
+
+    if (cores != updatedCores) {
+      resource.setVirtualCores(updatedCores);
+      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+    }
+
+    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+    if (resource.getMemory() != updatedMemory) {
+      resource.setMemory(updatedMemory);
+      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+    }
+
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
new file mode 100644
index 0000000..50b212d
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationSubmitter;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Hadoop21YarnAppClient extends AbstractIdleService implements YarnAppClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
+  private final YarnClient yarnClient;
+
+  public Hadoop21YarnAppClient(Configuration configuration) {
+    this.yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(configuration);
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
+    // Request for new application
+    YarnClientApplication application = yarnClient.createApplication();
+    final GetNewApplicationResponse response = application.getNewApplicationResponse();
+    final ApplicationId appId = response.getApplicationId();
+
+    // Setup the context for application submission
+    final ApplicationSubmissionContext appSubmissionContext = application.getApplicationSubmissionContext();
+    appSubmissionContext.setApplicationId(appId);
+    appSubmissionContext.setApplicationName(twillSpec.getName());
+
+    ApplicationSubmitter submitter = new ApplicationSubmitter() {
+      @Override
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) {
+        ContainerLaunchContext launchContext = context.getLaunchContext();
+
+        addRMToken(launchContext);
+        appSubmissionContext.setAMContainerSpec(launchContext);
+        appSubmissionContext.setResource(adjustMemory(response, capability));
+        appSubmissionContext.setMaxAppAttempts(2);
+
+        try {
+          yarnClient.submitApplication(appSubmissionContext);
+          return new ProcessControllerImpl(yarnClient, appId);
+        } catch (Exception e) {
+          LOG.error("Failed to submit application {}", appId, e);
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+
+    return new ApplicationMasterProcessLauncher(appId, submitter);
+  }
+
+  private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
+    int maxMemory = response.getMaximumResourceCapability().getMemory();
+    int updatedMemory = capability.getMemory();
+
+    if (updatedMemory > maxMemory) {
+      capability.setMemory(maxMemory);
+    }
+
+    return capability;
+  }
+
+  private void addRMToken(ContainerLaunchContext context) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    try {
+      Credentials credentials = YarnUtils.decodeCredentials(context.getTokens());
+
+      Configuration config = yarnClient.getConfig();
+      Token<TokenIdentifier> token = ConverterUtils.convertFromYarn(
+        yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
+        YarnUtils.getRMAddress(config));
+
+      LOG.info("Added RM delegation token {}", token);
+      credentials.addToken(token.getService(), token);
+
+      context.setTokens(YarnUtils.encodeCredentials(credentials));
+
+    } catch (Exception e) {
+      LOG.error("Fails to create credentials.", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
+    // Ignore user
+    return createLauncher(twillSpec);
+  }
+
+  @Override
+  public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
+    return new ProcessControllerImpl(yarnClient, appId);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    yarnClient.start();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    yarnClient.stop();
+  }
+
+  private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
+    private final YarnClient yarnClient;
+    private final ApplicationId appId;
+
+    public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
+      this.yarnClient = yarnClient;
+      this.appId = appId;
+    }
+
+    @Override
+    public YarnApplicationReport getReport() {
+      try {
+        return new Hadoop21YarnApplicationReport(yarnClient.getApplicationReport(appId));
+      } catch (Exception e) {
+        LOG.error("Failed to get application report {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      try {
+        yarnClient.killApplication(appId);
+      } catch (Exception e) {
+        LOG.error("Failed to kill application {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
new file mode 100644
index 0000000..6e614f5
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ *
+ */
+public final class Hadoop21YarnApplicationReport implements YarnApplicationReport {
+
+  private final ApplicationReport report;
+
+  public Hadoop21YarnApplicationReport(ApplicationReport report) {
+    this.report = report;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return report.getApplicationId();
+  }
+
+  @Override
+  public ApplicationAttemptId getCurrentApplicationAttemptId() {
+    return report.getCurrentApplicationAttemptId();
+  }
+
+  @Override
+  public String getQueue() {
+    return report.getQueue();
+  }
+
+  @Override
+  public String getName() {
+    return report.getName();
+  }
+
+  @Override
+  public String getHost() {
+    return report.getHost();
+  }
+
+  @Override
+  public int getRpcPort() {
+    return report.getRpcPort();
+  }
+
+  @Override
+  public YarnApplicationState getYarnApplicationState() {
+    return report.getYarnApplicationState();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return report.getDiagnostics();
+  }
+
+  @Override
+  public String getTrackingUrl() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public String getOriginalTrackingUrl() {
+    return report.getOriginalTrackingUrl();
+  }
+
+  @Override
+  public long getStartTime() {
+    return report.getStartTime();
+  }
+
+  @Override
+  public long getFinishTime() {
+    return report.getFinishTime();
+  }
+
+  @Override
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return report.getFinalApplicationStatus();
+  }
+
+  @Override
+  public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
+    return report.getApplicationResourceUsageReport();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
new file mode 100644
index 0000000..86903c1
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ *
+ */
+public final class Hadoop21YarnContainerInfo implements YarnContainerInfo {
+
+  private final Container container;
+
+  public Hadoop21YarnContainerInfo(Container container) {
+    this.container = container;
+  }
+
+  @Override
+  public <T> T getContainer() {
+    return (T) container;
+  }
+
+  @Override
+  public String getId() {
+    return container.getId().toString();
+  }
+
+  @Override
+  public InetAddress getHost() {
+    try {
+      return InetAddress.getByName(container.getNodeId().getHost());
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public int getPort() {
+    return container.getNodeId().getPort();
+  }
+
+  @Override
+  public int getMemoryMB() {
+    return container.getResource().getMemory();
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return container.getResource().getVirtualCores();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
new file mode 100644
index 0000000..f5758c7
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ *
+ */
+public final class Hadoop21YarnContainerStatus implements YarnContainerStatus {
+
+  private final ContainerStatus containerStatus;
+
+  public Hadoop21YarnContainerStatus(ContainerStatus containerStatus) {
+    this.containerStatus = containerStatus;
+  }
+
+  @Override
+  public String getContainerId() {
+    return containerStatus.getContainerId().toString();
+  }
+
+  @Override
+  public ContainerState getState() {
+    return containerStatus.getState();
+  }
+
+  @Override
+  public int getExitStatus() {
+    return containerStatus.getExitStatus();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return containerStatus.getDiagnostics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
new file mode 100644
index 0000000..8621f93
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class Hadoop21YarnLaunchContext implements YarnLaunchContext {
+
+  private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
+
+  static {
+    // Creates transform function from YarnLocalResource -> LocalResource
+    RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
+      @Override
+      public LocalResource apply(YarnLocalResource input) {
+        return input.getLocalResource();
+      }
+    };
+  }
+
+  private final ContainerLaunchContext launchContext;
+
+  public Hadoop21YarnLaunchContext() {
+    launchContext = Records.newRecord(ContainerLaunchContext.class);
+  }
+
+  @Override
+  public <T> T getLaunchContext() {
+    return (T) launchContext;
+  }
+
+  @Override
+  public void setCredentials(Credentials credentials) {
+    launchContext.setTokens(YarnUtils.encodeCredentials(credentials));
+  }
+
+  @Override
+  public void setLocalResources(Map<String, YarnLocalResource> localResources) {
+    launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
+  }
+
+  @Override
+  public void setServiceData(Map<String, ByteBuffer> serviceData) {
+    launchContext.setServiceData(serviceData);
+  }
+
+  @Override
+  public Map<String, String> getEnvironment() {
+    return launchContext.getEnvironment();
+  }
+
+  @Override
+  public void setEnvironment(Map<String, String> environment) {
+    launchContext.setEnvironment(environment);
+  }
+
+  @Override
+  public List<String> getCommands() {
+    return launchContext.getCommands();
+  }
+
+  @Override
+  public void setCommands(List<String> commands) {
+    launchContext.setCommands(commands);
+  }
+
+  @Override
+  public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
+    launchContext.setApplicationACLs(acls);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
new file mode 100644
index 0000000..3f756bd
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ */
+public final class Hadoop21YarnLocalResource implements YarnLocalResource {
+
+  private final LocalResource localResource;
+
+  public Hadoop21YarnLocalResource() {
+    this.localResource = Records.newRecord(LocalResource.class);
+  }
+
+  @Override
+  public <T> T getLocalResource() {
+    return (T) localResource;
+  }
+
+  @Override
+  public URL getResource() {
+    return localResource.getResource();
+  }
+
+  @Override
+  public void setResource(URL resource) {
+    localResource.setResource(resource);
+  }
+
+  @Override
+  public long getSize() {
+    return localResource.getSize();
+  }
+
+  @Override
+  public void setSize(long size) {
+    localResource.setSize(size);
+  }
+
+  @Override
+  public long getTimestamp() {
+    return localResource.getTimestamp();
+  }
+
+  @Override
+  public void setTimestamp(long timestamp) {
+    localResource.setTimestamp(timestamp);
+  }
+
+  @Override
+  public LocalResourceType getType() {
+    return localResource.getType();
+  }
+
+  @Override
+  public void setType(LocalResourceType type) {
+    localResource.setType(type);
+  }
+
+  @Override
+  public LocalResourceVisibility getVisibility() {
+    return localResource.getVisibility();
+  }
+
+  @Override
+  public void setVisibility(LocalResourceVisibility visibility) {
+    localResource.setVisibility(visibility);
+  }
+
+  @Override
+  public String getPattern() {
+    return localResource.getPattern();
+  }
+
+  @Override
+  public void setPattern(String pattern) {
+    localResource.setPattern(pattern);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
new file mode 100644
index 0000000..d3a6a80
--- /dev/null
+++ b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Hadoop21YarnNMClient extends AbstractIdleService implements YarnNMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnNMClient.class);
+
+  private final NMClient nmClient;
+
+  public Hadoop21YarnNMClient(Configuration configuration) {
+    this.nmClient = NMClient.createNMClient();
+    nmClient.init(configuration);
+  }
+
+  @Override
+  public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
+    try {
+      Container container = containerInfo.getContainer();
+      nmClient.startContainer(container, launchContext.<ContainerLaunchContext>getLaunchContext());
+      return new ContainerTerminator(container, nmClient);
+    } catch (Exception e) {
+      LOG.error("Error in launching process", e);
+      throw Throwables.propagate(e);
+    }
+
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    nmClient.start();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    nmClient.stop();
+  }
+
+  private static final class ContainerTerminator implements Cancellable {
+
+    private final Container container;
+    private final NMClient nmClient;
+
+    private ContainerTerminator(Container container, NMClient nmClient) {
+      this.container = container;
+      this.nmClient = nmClient;
+    }
+
+    @Override
+    public void cancel() {
+      LOG.info("Request to stop container {}.", container.getId());
+
+      try {
+        nmClient.stopContainer(container.getId(), container.getNodeId());
+        boolean completed = false;
+        while (!completed) {
+          ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
+          LOG.info("Container status: {} {}", status, status.getDiagnostics());
+
+          completed = (status.getState() == ContainerState.COMPLETE);
+        }
+        LOG.info("Container {} stopped.", container.getId());
+      } catch (Exception e) {
+        LOG.error("Fail to stop container {}", container.getId(), e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
new file mode 100644
index 0000000..b0eeb43
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
+/**
+ * A concrete implementation of {@link Location} for the HDFS filesystem.
+ */
+final class HDFSLocation implements Location {
+  private final FileSystem fs;
+  private final Path path;
+
+  /**
+   * Constructs a HDFSLocation.
+   *
+   * @param fs  An instance of {@link FileSystem}
+   * @param path of the file.
+   */
+  HDFSLocation(FileSystem fs, Path path) {
+    this.fs = fs;
+    this.path = path;
+  }
+
+  /**
+   * Checks if this location exists on HDFS.
+   *
+   * @return true if found; false otherwise.
+   * @throws IOException
+   */
+  @Override
+  public boolean exists() throws IOException {
+    return fs.exists(path);
+  }
+
+  /**
+   * @return An {@link InputStream} for this location on HDFS.
+   * @throws IOException
+   */
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return fs.open(path);
+  }
+
+  /**
+   * @return An {@link OutputStream} for this location on HDFS.
+   * @throws IOException
+   */
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return fs.create(path);
+  }
+
+  @Override
+  public OutputStream getOutputStream(String permission) throws IOException {
+    Configuration conf = fs.getConf();
+    return fs.create(path,
+                     new FsPermission(permission),
+                     true,
+                     conf.getInt("io.file.buffer.size", 4096),
+                     fs.getDefaultReplication(path),
+                     fs.getDefaultBlockSize(path),
+                     null);
+  }
+
+  /**
+   * Appends the child to the current {@link Location} on HDFS.
+   * <p>
+   * Returns a new instance of Location.
+   * </p>
+   *
+   * @param child to be appended to this location.
+   * @return A new instance of {@link Location}
+   * @throws IOException
+   */
+  @Override
+  public Location append(String child) throws IOException {
+    if (child.startsWith("/")) {
+      child = child.substring(1);
+    }
+    return new HDFSLocation(fs, new Path(URI.create(path.toUri() + "/" + child)));
+  }
+
+  @Override
+  public Location getTempFile(String suffix) throws IOException {
+    Path path = new Path(
+      URI.create(this.path.toUri() + "." + UUID.randomUUID() + (suffix == null ? TEMP_FILE_SUFFIX : suffix)));
+    return new HDFSLocation(fs, path);
+  }
+
+  /**
+   * @return Returns the name of the file or directory denoteed by this abstract pathname.
+   */
+  @Override
+  public String getName() {
+    return path.getName();
+  }
+
+  @Override
+  public boolean createNew() throws IOException {
+    return fs.createNewFile(path);
+  }
+
+  /**
+   * @return A {@link URI} for this location on HDFS.
+   */
+  @Override
+  public URI toURI() {
+    return path.toUri();
+  }
+
+  /**
+   * Deletes the file or directory denoted by this abstract pathname. If this
+   * pathname denotes a directory, then the directory must be empty in order
+   * to be deleted.
+   *
+   * @return true if and only if the file or directory is successfully deleted; false otherwise.
+   */
+  @Override
+  public boolean delete() throws IOException {
+    return fs.delete(path, false);
+  }
+
+  @Override
+  public boolean delete(boolean recursive) throws IOException {
+    return fs.delete(path, true);
+  }
+
+  @Override
+  public Location renameTo(Location destination) throws IOException {
+    // Destination will always be of the same type as this location.
+    if (fs instanceof DistributedFileSystem) {
+      ((DistributedFileSystem) fs).rename(path, ((HDFSLocation) destination).path, Options.Rename.OVERWRITE);
+      return new HDFSLocation(fs, new Path(destination.toURI()));
+    }
+
+    if (fs.rename(path, ((HDFSLocation) destination).path)) {
+      return new HDFSLocation(fs, new Path(destination.toURI()));
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Creates the directory named by this abstract pathname, including any necessary
+   * but nonexistent parent directories.
+   *
+   * @return true if and only if the renaming succeeded; false otherwise
+   */
+  @Override
+  public boolean mkdirs() throws IOException {
+    return fs.mkdirs(path);
+  }
+
+  /**
+   * @return Length of file.
+   */
+  @Override
+  public long length() throws IOException {
+    return fs.getFileStatus(path).getLen();
+  }
+
+  @Override
+  public long lastModified() throws IOException {
+    return fs.getFileStatus(path).getModificationTime();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
new file mode 100644
index 0000000..fa79391
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A {@link LocationFactory} that creates HDFS {@link Location}.
+ */
+public final class HDFSLocationFactory implements LocationFactory {
+
+  private final FileSystem fileSystem;
+  private final String pathBase;
+
+  public HDFSLocationFactory(Configuration configuration) {
+    this(getFileSystem(configuration));
+  }
+  
+  public HDFSLocationFactory(Configuration configuration, String pathBase) {
+    this(getFileSystem(configuration), pathBase);
+  }
+
+  public HDFSLocationFactory(FileSystem fileSystem) {
+    this(fileSystem, "/");
+  }
+
+  public HDFSLocationFactory(FileSystem fileSystem, String pathBase) {
+    String base = pathBase.equals("/") ? "" : pathBase;
+    base = base.endsWith("/") ? base.substring(0, base.length() - 1) : base;
+
+    this.fileSystem = fileSystem;
+    this.pathBase = base;
+  }
+
+  @Override
+  public Location create(String path) {
+    if (path.startsWith("/")) {
+      path = path.substring(1);
+    }
+    return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + path));
+  }
+
+  @Override
+  public Location create(URI uri) {
+    if (!uri.toString().startsWith(fileSystem.getUri().toString())) {
+      // It's a full URI
+      return new HDFSLocation(fileSystem, new Path(uri));
+    }
+    if (uri.isAbsolute()) {
+      return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + uri.getPath()));
+    }
+    return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + uri.getPath()));
+  }
+
+  @Override
+  public Location getHomeLocation() {
+    return new HDFSLocation(fileSystem, fileSystem.getHomeDirectory());
+  }
+
+  /**
+   * Returns the underlying {@link FileSystem} object.
+   */
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  private static FileSystem getFileSystem(Configuration configuration) {
+    try {
+      return FileSystem.get(configuration);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/package-info.java b/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
new file mode 100644
index 0000000..2ca09fd
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains HDFS location classes.
+ */
+package org.apache.twill.filesystem;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
new file mode 100644
index 0000000..47dd07c
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.SystemMessages;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+/**
+ * A base implementation of {@link Service} handle secure token update.
+ */
+public abstract class AbstractTwillService implements Service {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
+
+  protected final Location applicationLocation;
+
+  protected volatile Credentials credentials;
+
+  protected AbstractTwillService(Location applicationLocation) {
+    this.applicationLocation = applicationLocation;
+  }
+
+  protected abstract Service getServiceDelegate();
+
+  /**
+   * Returns the location of the secure store, or {@code null} if either not running in secure mode or an error
+   * occur when trying to acquire the location.
+   */
+  protected final Location getSecureStoreLocation() {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return null;
+    }
+    try {
+      return applicationLocation.append(Constants.Files.CREDENTIALS);
+    } catch (IOException e) {
+      LOG.error("Failed to create secure store location.", e);
+      return null;
+    }
+  }
+
+  /**
+   * Attempts to handle secure store update.
+   *
+   * @param message The message received
+   * @return {@code true} if the message requests for secure store update, {@code false} otherwise.
+   */
+  protected final boolean handleSecureStoreUpdate(Message message) {
+    if (!SystemMessages.SECURE_STORE_UPDATED.equals(message)) {
+      return false;
+    }
+
+    // If not in secure mode, simply ignore the message.
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return true;
+    }
+
+    try {
+      Credentials credentials = new Credentials();
+      Location location = getSecureStoreLocation();
+      DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()));
+      try {
+        credentials.readTokenStorageStream(input);
+      } finally {
+        input.close();
+      }
+
+      UserGroupInformation.getCurrentUser().addCredentials(credentials);
+      this.credentials = credentials;
+
+      LOG.info("Secure store updated from {}.", location.toURI());
+
+    } catch (Throwable t) {
+      LOG.error("Failed to update secure store.", t);
+    }
+
+    return true;
+  }
+
+  @Override
+  public final ListenableFuture<State> start() {
+    return getServiceDelegate().start();
+  }
+
+  @Override
+  public final State startAndWait() {
+    return Futures.getUnchecked(start());
+  }
+
+  @Override
+  public final boolean isRunning() {
+    return getServiceDelegate().isRunning();
+  }
+
+  @Override
+  public final State state() {
+    return getServiceDelegate().state();
+  }
+
+  @Override
+  public final ListenableFuture<State> stop() {
+    return getServiceDelegate().stop();
+  }
+
+  @Override
+  public final State stopAndWait() {
+    return Futures.getUnchecked(stop());
+  }
+
+  @Override
+  public final void addListener(Listener listener, Executor executor) {
+    getServiceDelegate().addListener(listener, executor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
new file mode 100644
index 0000000..4ffb023
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.filesystem.HDFSLocationFactory;
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.logging.KafkaAppender;
+import org.apache.twill.zookeeper.ZKClientService;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.classic.util.ContextInitializer;
+import ch.qos.logback.core.joran.spi.JoranException;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.ILoggerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.InputSource;
+
+import java.io.File;
+import java.io.StringReader;
+import java.net.URI;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Class for main method that starts a service.
+ */
+public abstract class ServiceMain {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ServiceMain.class);
+
+  static {
+    // This is to work around detection of HADOOP_HOME (HADOOP-9422)
+    if (!System.getenv().containsKey("HADOOP_HOME") && System.getProperty("hadoop.home.dir") == null) {
+      System.setProperty("hadoop.home.dir", new File("").getAbsolutePath());
+    }
+  }
+
+  protected final void doMain(final ZKClientService zkClientService,
+                              final Service service) throws ExecutionException, InterruptedException {
+    configureLogger();
+
+    final String serviceName = service.toString();
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        Services.chainStop(service, zkClientService);
+      }
+    });
+
+    // Listener for state changes of the service
+    ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
+
+    // Starts the service
+    LOG.info("Starting service {}.", serviceName);
+    Futures.getUnchecked(Services.chainStart(zkClientService, service));
+    LOG.info("Service {} started.", serviceName);
+    try {
+      completion.get();
+      LOG.info("Service {} completed.", serviceName);
+    } catch (Throwable t) {
+      LOG.warn("Exception thrown from service {}.", serviceName, t);
+      throw Throwables.propagate(t);
+    } finally {
+      ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+      if (loggerFactory instanceof LoggerContext) {
+        ((LoggerContext) loggerFactory).stop();
+      }
+    }
+  }
+
+  protected abstract String getHostname();
+
+  protected abstract String getKafkaZKConnect();
+
+  /**
+   * Returns the {@link Location} for the application based on the env {@link EnvKeys#TWILL_APP_DIR}.
+   */
+  protected static Location createAppLocation(Configuration conf) {
+    // Note: It's a little bit hacky based on the uri schema to create the LocationFactory, refactor it later.
+    URI appDir = URI.create(System.getenv(EnvKeys.TWILL_APP_DIR));
+
+    try {
+      if ("file".equals(appDir.getScheme())) {
+        return new LocalLocationFactory().create(appDir);
+      }
+
+      if ("hdfs".equals(appDir.getScheme())) {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          return new HDFSLocationFactory(FileSystem.get(conf)).create(appDir);
+        }
+
+        String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
+        if (fsUser == null) {
+          throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
+        }
+        return new HDFSLocationFactory(FileSystem.get(FileSystem.getDefaultUri(conf), conf, fsUser)).create(appDir);
+      }
+
+      LOG.warn("Unsupported location type {}.", appDir);
+      throw new IllegalArgumentException("Unsupported location type " + appDir);
+
+    } catch (Exception e) {
+      LOG.error("Failed to create application location for {}.", appDir);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private void configureLogger() {
+    // Check if SLF4J is bound to logback in the current environment
+    ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+    if (!(loggerFactory instanceof LoggerContext)) {
+      return;
+    }
+
+    LoggerContext context = (LoggerContext) loggerFactory;
+    context.reset();
+    JoranConfigurator configurator = new JoranConfigurator();
+    configurator.setContext(context);
+
+    try {
+      File twillLogback = new File(Constants.Files.LOGBACK_TEMPLATE);
+      if (twillLogback.exists()) {
+        configurator.doConfigure(twillLogback);
+      }
+      new ContextInitializer(context).autoConfig();
+    } catch (JoranException e) {
+      throw Throwables.propagate(e);
+    }
+    doConfigure(configurator, getLogConfig(getLoggerLevel(context.getLogger(Logger.ROOT_LOGGER_NAME))));
+  }
+
+  private void doConfigure(JoranConfigurator configurator, String config) {
+    try {
+      configurator.doConfigure(new InputSource(new StringReader(config)));
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private String getLogConfig(String rootLevel) {
+    return
+      "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+      "<configuration>\n" +
+      "    <appender name=\"KAFKA\" class=\"" + KafkaAppender.class.getName() + "\">\n" +
+      "        <topic>" + Constants.LOG_TOPIC + "</topic>\n" +
+      "        <hostname>" + getHostname() + "</hostname>\n" +
+      "        <zookeeper>" + getKafkaZKConnect() + "</zookeeper>\n" +
+      "    </appender>\n" +
+      "    <logger name=\"org.apache.twill.internal.logging\" additivity=\"false\" />\n" +
+      "    <root level=\"" + rootLevel + "\">\n" +
+      "        <appender-ref ref=\"KAFKA\"/>\n" +
+      "    </root>\n" +
+      "</configuration>";
+  }
+
+  private String getLoggerLevel(Logger logger) {
+    if (logger instanceof ch.qos.logback.classic.Logger) {
+      return ((ch.qos.logback.classic.Logger) logger).getLevel().toString();
+    }
+
+    if (logger.isTraceEnabled()) {
+      return "TRACE";
+    }
+    if (logger.isDebugEnabled()) {
+      return "DEBUG";
+    }
+    if (logger.isInfoEnabled()) {
+      return "INFO";
+    }
+    if (logger.isWarnEnabled()) {
+      return "WARN";
+    }
+    if (logger.isErrorEnabled()) {
+      return "ERROR";
+    }
+    return "OFF";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
new file mode 100644
index 0000000..028df7b
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+/**
+ * Represents data being stored in the live node of the application master.
+ */
+public final class ApplicationMasterLiveNodeData {
+
+  private final int appId;
+  private final long appIdClusterTime;
+  private final String containerId;
+
+  public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime, String containerId) {
+    this.appId = appId;
+    this.appIdClusterTime = appIdClusterTime;
+    this.containerId = containerId;
+  }
+
+  public int getAppId() {
+    return appId;
+  }
+
+  public long getAppIdClusterTime() {
+    return appIdClusterTime;
+  }
+
+  public String getContainerId() {
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
new file mode 100644
index 0000000..b34a7a2
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Main class for launching {@link ApplicationMasterService}.
+ */
+public final class ApplicationMasterMain extends ServiceMain {
+
+  private final String kafkaZKConnect;
+
+  private ApplicationMasterMain(String kafkaZKConnect) {
+    this.kafkaZKConnect = kafkaZKConnect;
+  }
+
+  /**
+   * Starts the application master.
+   */
+  public static void main(String[] args) throws Exception {
+    String zkConnect = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
+    File twillSpec = new File(Constants.Files.TWILL_SPEC);
+    RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
+
+    ZKClientService zkClientService =
+      ZKClientServices.delegate(
+        ZKClients.reWatchOnExpire(
+          ZKClients.retryOnFailure(
+            ZKClientService.Builder.of(zkConnect).build(),
+            RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+    Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
+    Service service = new ApplicationMasterService(runId, zkClientService, twillSpec,
+                                                   new VersionDetectYarnAMClientFactory(conf), createAppLocation(conf));
+    new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())).doMain(zkClientService, service);
+  }
+
+  @Override
+  protected String getHostname() {
+    try {
+      return InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      return "unknown";
+    }
+  }
+
+  @Override
+  protected String getKafkaZKConnect() {
+    return kafkaZKConnect;
+  }
+}