You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:56 UTC

[14/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
new file mode 100644
index 0000000..b327b94
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnLocalResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ */
+public final class Hadoop20YarnLocalResource implements YarnLocalResource {
+
+  private final LocalResource localResource;
+
+  public Hadoop20YarnLocalResource() {
+    this.localResource = Records.newRecord(LocalResource.class);
+  }
+
+  @Override
+  public <T> T getLocalResource() {
+    return (T) localResource;
+  }
+
+  @Override
+  public URL getResource() {
+    return localResource.getResource();
+  }
+
+  @Override
+  public void setResource(URL resource) {
+    localResource.setResource(resource);
+  }
+
+  @Override
+  public long getSize() {
+    return localResource.getSize();
+  }
+
+  @Override
+  public void setSize(long size) {
+    localResource.setSize(size);
+  }
+
+  @Override
+  public long getTimestamp() {
+    return localResource.getTimestamp();
+  }
+
+  @Override
+  public void setTimestamp(long timestamp) {
+    localResource.setTimestamp(timestamp);
+  }
+
+  @Override
+  public LocalResourceType getType() {
+    return localResource.getType();
+  }
+
+  @Override
+  public void setType(LocalResourceType type) {
+    localResource.setType(type);
+  }
+
+  @Override
+  public LocalResourceVisibility getVisibility() {
+    return localResource.getVisibility();
+  }
+
+  @Override
+  public void setVisibility(LocalResourceVisibility visibility) {
+    localResource.setVisibility(visibility);
+  }
+
+  @Override
+  public String getPattern() {
+    return localResource.getPattern();
+  }
+
+  @Override
+  public void setPattern(String pattern) {
+    localResource.setPattern(pattern);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
new file mode 100644
index 0000000..98ecc67
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnNMClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class Hadoop20YarnNMClient implements YarnNMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnNMClient.class);
+
+  private final YarnRPC yarnRPC;
+  private final Configuration yarnConf;
+
+  public Hadoop20YarnNMClient(YarnRPC yarnRPC, Configuration yarnConf) {
+    this.yarnRPC = yarnRPC;
+    this.yarnConf = yarnConf;
+  }
+
+  @Override
+  public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
+    ContainerLaunchContext context = launchContext.getLaunchContext();
+    context.setUser(System.getProperty("user.name"));
+
+    Container container = containerInfo.getContainer();
+
+    context.setContainerId(container.getId());
+    context.setResource(container.getResource());
+
+    StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class);
+    startRequest.setContainerLaunchContext(context);
+
+    ContainerManager manager = connectContainerManager(container);
+    try {
+      manager.startContainer(startRequest);
+      return new ContainerTerminator(container, manager);
+    } catch (YarnRemoteException e) {
+      LOG.error("Error in launching process", e);
+      throw Throwables.propagate(e);
+    }
+
+  }
+
+  /**
+   * Helper to connect to container manager (node manager).
+   */
+  private ContainerManager connectContainerManager(Container container) {
+    String cmIpPortStr = String.format("%s:%d", container.getNodeId().getHost(), container.getNodeId().getPort());
+    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+    return ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConf));
+  }
+
+  private static final class ContainerTerminator implements Cancellable {
+
+    private final Container container;
+    private final ContainerManager manager;
+
+    private ContainerTerminator(Container container, ContainerManager manager) {
+      this.container = container;
+      this.manager = manager;
+    }
+
+    @Override
+    public void cancel() {
+      LOG.info("Request to stop container {}.", container.getId());
+      StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
+      stopRequest.setContainerId(container.getId());
+      try {
+        manager.stopContainer(stopRequest);
+        boolean completed = false;
+        while (!completed) {
+          GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class);
+          statusRequest.setContainerId(container.getId());
+          GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest);
+          LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics());
+
+          completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE);
+        }
+        LOG.info("Container {} stopped.", container.getId());
+      } catch (YarnRemoteException e) {
+        LOG.error("Fail to stop container {}", container.getId(), e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
new file mode 100644
index 0000000..26b6fa2
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+/**
+ * Ported from Apache Hadoop YARN.
+ */
+public interface AMRMClient extends Service {
+
+  /**
+   * Value used to define no locality.
+   */
+  static final String ANY = "*";
+
+  /**
+   * Object to represent container request for resources.
+   * Resources may be localized to nodes and racks.
+   * Resources may be assigned priorities.
+   * Can ask for multiple containers of a given type.
+   */
+  public static class ContainerRequest {
+    Resource capability;
+    String[] hosts;
+    String[] racks;
+    Priority priority;
+    int containerCount;
+
+    public ContainerRequest(Resource capability, String[] hosts,
+                            String[] racks, Priority priority, int containerCount) {
+      this.capability = capability;
+      this.hosts = (hosts != null ? hosts.clone() : null);
+      this.racks = (racks != null ? racks.clone() : null);
+      this.priority = priority;
+      this.containerCount = containerCount;
+    }
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      sb.append("ContainerCount[").append(containerCount).append("]");
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Register the application master. This must be called before any
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws org.apache.hadoop.yarn.exceptions.YarnRemoteException
+   */
+  public RegisterApplicationMasterResponse
+  registerApplicationMaster(String appHostName,
+                            int appHostPort,
+                            String appTrackingUrl)
+    throws YarnRemoteException;
+
+  /**
+   * Request additional containers and receive new container allocations.
+   * Requests made via <code>addContainerRequest</code> are sent to the
+   * <code>ResourceManager</code>. New containers assigned to the master are
+   * retrieved. Status of completed containers and node health updates are
+   * also retrieved.
+   * This also doubles as a heartbeat to the ResourceManager and must be
+   * made periodically.
+   * The call may not always return any new allocations of containers.
+   * App should not make concurrent allocate requests. May cause request loss.
+   * @param progressIndicator Indicates progress made by the master
+   * @return the response of the allocate request
+   * @throws YarnRemoteException
+   */
+  public AllocationResponse allocate(float progressIndicator)
+    throws YarnRemoteException;
+
+  /**
+   * Unregister the Application Master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnRemoteException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                          String appMessage,
+                                          String appTrackingUrl)
+    throws YarnRemoteException;
+
+  /**
+   * Request containers for resources before calling <code>allocate</code>.
+   * @param req Resource request
+   */
+  public void addContainerRequest(ContainerRequest req);
+
+  /**
+   * Remove previous container request. The previous container request may have
+   * already been sent to the ResourceManager. So even after the remove request
+   * the app must be prepared to receive an allocation for the previous request
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(ContainerRequest req);
+
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release it.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. For example, if it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId);
+
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources();
+
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
new file mode 100644
index 0000000..c1bd75a
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Ported from Apache Hadoop YARN.
+ */
+public final class AMRMClientImpl extends AbstractService implements AMRMClient {
+
+  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+
+  private final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  private int lastResponseId = 0;
+
+  protected AMRMProtocol rmClient;
+  protected final ApplicationAttemptId appAttemptId;
+  protected Resource clusterAvailableResources;
+  protected int clusterNodeCount;
+
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceRequest
+  protected final
+  Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+    remoteRequestsTable =
+    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+    new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
+  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+
+  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+    super(AMRMClientImpl.class.getName());
+    this.appAttemptId = appAttemptId;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    final YarnConfiguration conf = new YarnConfiguration(getConfig());
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+      YarnConfiguration.RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+        ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, rmAddress);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
+                                           conf);
+      }
+    });
+    LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (this.rmClient != null) {
+      RPC.stopProxy(this.rmClient);
+    }
+    super.stop();
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+    String appHostName, int appHostPort, String appTrackingUrl)
+    throws YarnRemoteException {
+    // do this only once ???
+    RegisterApplicationMasterRequest request = recordFactory
+      .newRecordInstance(RegisterApplicationMasterRequest.class);
+    synchronized (this) {
+      request.setApplicationAttemptId(appAttemptId);
+    }
+    request.setHost(appHostName);
+    request.setRpcPort(appHostPort);
+    if (appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    RegisterApplicationMasterResponse response = rmClient
+      .registerApplicationMaster(request);
+    return response;
+  }
+
+  @Override
+  public AllocationResponse allocate(float progressIndicator)
+    throws YarnRemoteException {
+    AllocateResponse allocateResponse = null;
+    ArrayList<ResourceRequest> askList = null;
+    ArrayList<ContainerId> releaseList = null;
+    AllocateRequest allocateRequest = null;
+
+    try {
+      synchronized (this) {
+        askList = new ArrayList<ResourceRequest>(ask);
+        releaseList = new ArrayList<ContainerId>(release);
+        // optimistically clear this collection assuming no RPC failure
+        ask.clear();
+        release.clear();
+        allocateRequest = BuilderUtils
+          .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
+                              askList, releaseList);
+      }
+
+      allocateResponse = rmClient.allocate(allocateRequest);
+      AllocationResponse response = AllocationResponses.create(allocateResponse);
+
+      synchronized (this) {
+        // update these on successful RPC
+        clusterNodeCount = allocateResponse.getNumClusterNodes();
+        lastResponseId = response.getResponseId();
+        clusterAvailableResources = response.getAvailableResources();
+      }
+
+      return response;
+    } finally {
+      // TODO how to differentiate remote YARN exception vs error in RPC
+      if (allocateResponse == null) {
+        // We hit an exception in allocate()
+        // Preserve ask and release for next call to allocate()
+        synchronized (this) {
+          release.addAll(releaseList);
+          // Requests could have been added or deleted during call to allocate.
+          // If requests were added/removed then there is nothing to do since
+          // the ResourceRequest object in ask would have the actual new value.
+          // If ask does not have this ResourceRequest then it was unchanged and
+          // so we can add the value back safely.
+          // This assumes that there will no concurrent calls to allocate() and
+          // so we don't have to worry about ask being changed in the
+          // synchronized block at the beginning of this method.
+          for (ResourceRequest oldAsk : askList) {
+            if (!ask.contains(oldAsk)) {
+              ask.add(oldAsk);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                          String appMessage, String appTrackingUrl) throws YarnRemoteException {
+    FinishApplicationMasterRequest request = recordFactory
+      .newRecordInstance(FinishApplicationMasterRequest.class);
+    request.setAppAttemptId(appAttemptId);
+    request.setFinishApplicationStatus(appStatus);
+    if (appMessage != null) {
+      request.setDiagnostics(appMessage);
+    }
+    if (appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    rmClient.finishApplicationMaster(request);
+  }
+
+  @Override
+  public synchronized void addContainerRequest(ContainerRequest req) {
+    // Create resource requests
+    if (req.hosts != null) {
+      for (String host : req.hosts) {
+        addResourceRequest(req.priority, host, req.capability, req.containerCount);
+      }
+    }
+
+    if (req.racks != null) {
+      for (String rack : req.racks) {
+        addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+
+    // Off switch
+    addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+  }
+
+  @Override
+  public synchronized void removeContainerRequest(ContainerRequest req) {
+    // Update resource requests
+    if (req.hosts != null) {
+      for (String hostName : req.hosts) {
+        decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+      }
+    }
+
+    if (req.racks != null) {
+      for (String rack : req.racks) {
+        decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+
+    decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+  }
+
+  @Override
+  public synchronized void releaseAssignedContainer(ContainerId containerId) {
+    release.add(containerId);
+  }
+
+  @Override
+  public synchronized Resource getClusterAvailableResources() {
+    return clusterAvailableResources;
+  }
+
+  @Override
+  public synchronized int getClusterNodeCount() {
+    return clusterNodeCount;
+  }
+
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // This code looks weird but is needed because of the following scenario.
+    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
+    // request is added to 'ask' to notify the RM about not needing it any more.
+    // Before the call to allocate, the user now requests more containers. If 
+    // the locations of the 0 size request and the new request are the same
+    // (with the difference being only container count), then the set comparator
+    // will consider both to be the same and not add the new request to ask. So 
+    // we need to check for the "same" request being present and remove it and 
+    // then add it back. The comparator is container count agnostic.
+    // This should happen only rarely but we do need to guard against it.
+    if (ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+                                  Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequest>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+    if (remoteRequest == null) {
+      remoteRequest = BuilderUtils.
+        newResourceRequest(priority, resourceName, capability, 0);
+      reqMap.put(capability, remoteRequest);
+    }
+
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+
+    // Note this down for next interaction with ResourceManager
+    addResourceRequestToAsk(remoteRequest);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+                  + appAttemptId + " priority=" + priority.getPriority()
+                  + " resourceName=" + resourceName + " numContainers="
+                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+  private void decResourceRequest(Priority priority, String resourceName,
+                                  Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+
+    if (remoteRequests == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as priority " + priority
+                    + " is not present in request table");
+      }
+      return;
+    }
+
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+                    + " is not present in request table");
+      }
+      return;
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+                  + appAttemptId + " priority=" + priority.getPriority()
+                  + " resourceName=" + resourceName + " numContainers="
+                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+
+    remoteRequest.
+      setNumContainers(remoteRequest.getNumContainers() - containerCount);
+    if (remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      remoteRequest.setNumContainers(0);
+    }
+    // Send the ResourceRequest to RM even if is 0 because it needs to override
+    // a previously sent value. If ResourceRequest was not sent previously then
+    // sending 0 ought to be a no-op on RM.
+    addResourceRequestToAsk(remoteRequest);
+
+    // Delete entries from map if no longer needed.
+    if (remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+                 + appAttemptId + " priority=" + priority.getPriority()
+                 + " resourceName=" + resourceName + " numContainers="
+                 + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
new file mode 100644
index 0000000..89734fc
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.List;
+
+/**
+ * This interface is to abstract the differences in Vanilla Hadoop YARN 2.0 and CDH 4.4
+ */
+public interface AllocationResponse {
+
+  int getResponseId();
+
+  Resource getAvailableResources();
+
+  List<Container> getAllocatedContainers();
+
+  List<ContainerStatus> getCompletedContainersStatuses();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
new file mode 100644
index 0000000..ea46c3b
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn.ports;
+
+import com.google.common.base.Throwables;
+import com.google.common.reflect.TypeToken;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.List;
+
+/**
+ * Factory for building instance of {@link AllocationResponse} based on the response type.
+ */
+public final class AllocationResponses {
+
+  /**
+   * A hack for CDH 4.4.0, as the AllocateResponse class is being rewritten and diverted from YARN 2.0
+   */
+  private static final boolean IS_CDH_4_4;
+
+  static {
+    boolean result = false;
+    try {
+      try {
+        // See if it is standard YARN 2.0 AllocateResponse object.
+        AllocateResponse.class.getMethod("getAMResponse");
+      } catch (NoSuchMethodException e) {
+          // See if it is CDH 4.4 AllocateResponse object.
+        AllocationResponse.class.getMethod("getAllocatedContainers");
+        result = true;
+      }
+    } catch (Exception e) {
+      // Something very wrong in here, as it shouldn't arrive here.
+      e.printStackTrace();
+      throw Throwables.propagate(e);
+    }
+
+    IS_CDH_4_4 = result;
+  }
+
+  public static AllocationResponse create(Object response) {
+    if (IS_CDH_4_4) {
+      return new ReflectionAllocationResponse(response);
+    }
+
+    try {
+      Object amResponse = response.getClass().getMethod("getAMResponse").invoke(response);
+      return new ReflectionAllocationResponse(amResponse);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private static final class ReflectionAllocationResponse implements AllocationResponse {
+
+    private final Object response;
+
+    private ReflectionAllocationResponse(Object response) {
+      this.response = response;
+    }
+
+    @Override
+    public int getResponseId() {
+      return call("getResponseId", TypeToken.of(Integer.class));
+    }
+
+    @Override
+    public Resource getAvailableResources() {
+      return call("getAvailableResources", TypeToken.of(Resource.class));
+    }
+
+    @Override
+    public List<Container> getAllocatedContainers() {
+      return call("getAllocatedContainers", new TypeToken<List<Container>>() {});
+    }
+
+    @Override
+    public List<ContainerStatus> getCompletedContainersStatuses() {
+      return call("getCompletedContainersStatuses", new TypeToken<List<ContainerStatus>>() {});
+    }
+
+    private <T> T call(String methodName, TypeToken<T> resultType) {
+      try {
+        return (T) resultType.getRawType().cast(response.getClass().getMethod(methodName).invoke(response));
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+
+  private AllocationResponses() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
new file mode 100644
index 0000000..ce8f90f
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop21YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
+
+  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+  static {
+    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+      @Override
+      public YarnContainerStatus apply(ContainerStatus status) {
+        return new Hadoop21YarnContainerStatus(status);
+      }
+    };
+  }
+
+  private final ContainerId containerId;
+  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+  private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
+  private final Hadoop21YarnNMClient nmClient;
+  private InetSocketAddress trackerAddr;
+  private URL trackerUrl;
+  private Resource maxCapability;
+
+  public Hadoop21YarnAMClient(Configuration conf) {
+    String masterContainerId = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
+    Preconditions.checkArgument(masterContainerId != null,
+                                "Missing %s from environment", ApplicationConstants.Environment.CONTAINER_ID.name());
+    this.containerId = ConverterUtils.toContainerId(masterContainerId);
+    this.containerRequests = ArrayListMultimap.create();
+
+    this.amrmClient = AMRMClient.createAMRMClient();
+    this.amrmClient.init(conf);
+    this.nmClient = new Hadoop21YarnNMClient(conf);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+    amrmClient.start();
+    RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+                                                                                      trackerAddr.getPort(),
+                                                                                      trackerUrl.toString());
+    maxCapability = response.getMaximumResourceCapability();
+    nmClient.startAndWait();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    nmClient.stopAndWait();
+    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+    amrmClient.stop();
+  }
+
+  @Override
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  @Override
+  public String getHost() {
+    return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
+  }
+
+  @Override
+  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+    this.trackerAddr = trackerAddr;
+    this.trackerUrl = trackerUrl;
+  }
+
+  @Override
+  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+    AllocateResponse allocateResponse = amrmClient.allocate(progress);
+    List<ProcessLauncher<YarnContainerInfo>> launchers
+      = Lists.newArrayListWithCapacity(allocateResponse.getAllocatedContainers().size());
+
+    for (Container container : allocateResponse.getAllocatedContainers()) {
+      launchers.add(new RunnableProcessLauncher(new Hadoop21YarnContainerInfo(container), nmClient));
+    }
+
+    if (!launchers.isEmpty()) {
+      handler.acquired(launchers);
+
+      // If no process has been launched through the given launcher, return the container.
+      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+        // This cast always works.
+        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+        if (!launcher.isLaunched()) {
+          Container container = launcher.getContainerInfo().getContainer();
+          LOG.info("Nothing to run in container, releasing it: {}", container);
+          amrmClient.releaseAssignedContainer(container.getId());
+        }
+      }
+    }
+
+    List<YarnContainerStatus> completed = ImmutableList.copyOf(
+      Iterables.transform(allocateResponse.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+    if (!completed.isEmpty()) {
+      handler.completed(completed);
+    }
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability) {
+    return addContainerRequest(capability, 1);
+  }
+
+  @Override
+  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+    return new ContainerRequestBuilder(adjustCapability(capability), count) {
+      @Override
+      public String apply() {
+        synchronized (Hadoop21YarnAMClient.this) {
+          String id = UUID.randomUUID().toString();
+
+          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+          for (int i = 0; i < count; i++) {
+            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
+            containerRequests.put(id, request);
+            amrmClient.addContainerRequest(request);
+          }
+
+          return id;
+        }
+      }
+    };
+  }
+
+  @Override
+  public synchronized void completeContainerRequest(String id) {
+    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+      amrmClient.removeContainerRequest(request);
+    }
+  }
+
+  private Resource adjustCapability(Resource resource) {
+    int cores = resource.getVirtualCores();
+    int updatedCores = Math.min(resource.getVirtualCores(), maxCapability.getVirtualCores());
+
+    if (cores != updatedCores) {
+      resource.setVirtualCores(updatedCores);
+      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+    }
+
+    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+    if (resource.getMemory() != updatedMemory) {
+      resource.setMemory(updatedMemory);
+      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+    }
+
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
new file mode 100644
index 0000000..50b212d
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationSubmitter;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Hadoop21YarnAppClient extends AbstractIdleService implements YarnAppClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
+  private final YarnClient yarnClient;
+
+  public Hadoop21YarnAppClient(Configuration configuration) {
+    this.yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(configuration);
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
+    // Request for new application
+    YarnClientApplication application = yarnClient.createApplication();
+    final GetNewApplicationResponse response = application.getNewApplicationResponse();
+    final ApplicationId appId = response.getApplicationId();
+
+    // Setup the context for application submission
+    final ApplicationSubmissionContext appSubmissionContext = application.getApplicationSubmissionContext();
+    appSubmissionContext.setApplicationId(appId);
+    appSubmissionContext.setApplicationName(twillSpec.getName());
+
+    ApplicationSubmitter submitter = new ApplicationSubmitter() {
+      @Override
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) {
+        ContainerLaunchContext launchContext = context.getLaunchContext();
+
+        addRMToken(launchContext);
+        appSubmissionContext.setAMContainerSpec(launchContext);
+        appSubmissionContext.setResource(adjustMemory(response, capability));
+        appSubmissionContext.setMaxAppAttempts(2);
+
+        try {
+          yarnClient.submitApplication(appSubmissionContext);
+          return new ProcessControllerImpl(yarnClient, appId);
+        } catch (Exception e) {
+          LOG.error("Failed to submit application {}", appId, e);
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+
+    return new ApplicationMasterProcessLauncher(appId, submitter);
+  }
+
+  private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
+    int maxMemory = response.getMaximumResourceCapability().getMemory();
+    int updatedMemory = capability.getMemory();
+
+    if (updatedMemory > maxMemory) {
+      capability.setMemory(maxMemory);
+    }
+
+    return capability;
+  }
+
+  private void addRMToken(ContainerLaunchContext context) {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    try {
+      Credentials credentials = YarnUtils.decodeCredentials(context.getTokens());
+
+      Configuration config = yarnClient.getConfig();
+      Token<TokenIdentifier> token = ConverterUtils.convertFromYarn(
+        yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
+        YarnUtils.getRMAddress(config));
+
+      LOG.info("Added RM delegation token {}", token);
+      credentials.addToken(token.getService(), token);
+
+      context.setTokens(YarnUtils.encodeCredentials(credentials));
+
+    } catch (Exception e) {
+      LOG.error("Fails to create credentials.", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
+    // Ignore user
+    return createLauncher(twillSpec);
+  }
+
+  @Override
+  public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
+    return new ProcessControllerImpl(yarnClient, appId);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    yarnClient.start();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    yarnClient.stop();
+  }
+
+  private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
+    private final YarnClient yarnClient;
+    private final ApplicationId appId;
+
+    public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
+      this.yarnClient = yarnClient;
+      this.appId = appId;
+    }
+
+    @Override
+    public YarnApplicationReport getReport() {
+      try {
+        return new Hadoop21YarnApplicationReport(yarnClient.getApplicationReport(appId));
+      } catch (Exception e) {
+        LOG.error("Failed to get application report {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      try {
+        yarnClient.killApplication(appId);
+      } catch (Exception e) {
+        LOG.error("Failed to kill application {}", appId, e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
new file mode 100644
index 0000000..6e614f5
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+/**
+ *
+ */
+public final class Hadoop21YarnApplicationReport implements YarnApplicationReport {
+
+  private final ApplicationReport report;
+
+  public Hadoop21YarnApplicationReport(ApplicationReport report) {
+    this.report = report;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return report.getApplicationId();
+  }
+
+  @Override
+  public ApplicationAttemptId getCurrentApplicationAttemptId() {
+    return report.getCurrentApplicationAttemptId();
+  }
+
+  @Override
+  public String getQueue() {
+    return report.getQueue();
+  }
+
+  @Override
+  public String getName() {
+    return report.getName();
+  }
+
+  @Override
+  public String getHost() {
+    return report.getHost();
+  }
+
+  @Override
+  public int getRpcPort() {
+    return report.getRpcPort();
+  }
+
+  @Override
+  public YarnApplicationState getYarnApplicationState() {
+    return report.getYarnApplicationState();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return report.getDiagnostics();
+  }
+
+  @Override
+  public String getTrackingUrl() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public String getOriginalTrackingUrl() {
+    return report.getOriginalTrackingUrl();
+  }
+
+  @Override
+  public long getStartTime() {
+    return report.getStartTime();
+  }
+
+  @Override
+  public long getFinishTime() {
+    return report.getFinishTime();
+  }
+
+  @Override
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return report.getFinalApplicationStatus();
+  }
+
+  @Override
+  public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
+    return report.getApplicationResourceUsageReport();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
new file mode 100644
index 0000000..86903c1
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ *
+ */
+public final class Hadoop21YarnContainerInfo implements YarnContainerInfo {
+
+  private final Container container;
+
+  public Hadoop21YarnContainerInfo(Container container) {
+    this.container = container;
+  }
+
+  @Override
+  public <T> T getContainer() {
+    return (T) container;
+  }
+
+  @Override
+  public String getId() {
+    return container.getId().toString();
+  }
+
+  @Override
+  public InetAddress getHost() {
+    try {
+      return InetAddress.getByName(container.getNodeId().getHost());
+    } catch (UnknownHostException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public int getPort() {
+    return container.getNodeId().getPort();
+  }
+
+  @Override
+  public int getMemoryMB() {
+    return container.getResource().getMemory();
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return container.getResource().getVirtualCores();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
new file mode 100644
index 0000000..f5758c7
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ *
+ */
+public final class Hadoop21YarnContainerStatus implements YarnContainerStatus {
+
+  private final ContainerStatus containerStatus;
+
+  public Hadoop21YarnContainerStatus(ContainerStatus containerStatus) {
+    this.containerStatus = containerStatus;
+  }
+
+  @Override
+  public String getContainerId() {
+    return containerStatus.getContainerId().toString();
+  }
+
+  @Override
+  public ContainerState getState() {
+    return containerStatus.getState();
+  }
+
+  @Override
+  public int getExitStatus() {
+    return containerStatus.getExitStatus();
+  }
+
+  @Override
+  public String getDiagnostics() {
+    return containerStatus.getDiagnostics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
new file mode 100644
index 0000000..8621f93
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class Hadoop21YarnLaunchContext implements YarnLaunchContext {
+
+  private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
+
+  static {
+    // Creates transform function from YarnLocalResource -> LocalResource
+    RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
+      @Override
+      public LocalResource apply(YarnLocalResource input) {
+        return input.getLocalResource();
+      }
+    };
+  }
+
+  private final ContainerLaunchContext launchContext;
+
+  public Hadoop21YarnLaunchContext() {
+    launchContext = Records.newRecord(ContainerLaunchContext.class);
+  }
+
+  @Override
+  public <T> T getLaunchContext() {
+    return (T) launchContext;
+  }
+
+  @Override
+  public void setCredentials(Credentials credentials) {
+    launchContext.setTokens(YarnUtils.encodeCredentials(credentials));
+  }
+
+  @Override
+  public void setLocalResources(Map<String, YarnLocalResource> localResources) {
+    launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
+  }
+
+  @Override
+  public void setServiceData(Map<String, ByteBuffer> serviceData) {
+    launchContext.setServiceData(serviceData);
+  }
+
+  @Override
+  public Map<String, String> getEnvironment() {
+    return launchContext.getEnvironment();
+  }
+
+  @Override
+  public void setEnvironment(Map<String, String> environment) {
+    launchContext.setEnvironment(environment);
+  }
+
+  @Override
+  public List<String> getCommands() {
+    return launchContext.getCommands();
+  }
+
+  @Override
+  public void setCommands(List<String> commands) {
+    launchContext.setCommands(commands);
+  }
+
+  @Override
+  public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
+    launchContext.setApplicationACLs(acls);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
new file mode 100644
index 0000000..3f756bd
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ */
+public final class Hadoop21YarnLocalResource implements YarnLocalResource {
+
+  private final LocalResource localResource;
+
+  public Hadoop21YarnLocalResource() {
+    this.localResource = Records.newRecord(LocalResource.class);
+  }
+
+  @Override
+  public <T> T getLocalResource() {
+    return (T) localResource;
+  }
+
+  @Override
+  public URL getResource() {
+    return localResource.getResource();
+  }
+
+  @Override
+  public void setResource(URL resource) {
+    localResource.setResource(resource);
+  }
+
+  @Override
+  public long getSize() {
+    return localResource.getSize();
+  }
+
+  @Override
+  public void setSize(long size) {
+    localResource.setSize(size);
+  }
+
+  @Override
+  public long getTimestamp() {
+    return localResource.getTimestamp();
+  }
+
+  @Override
+  public void setTimestamp(long timestamp) {
+    localResource.setTimestamp(timestamp);
+  }
+
+  @Override
+  public LocalResourceType getType() {
+    return localResource.getType();
+  }
+
+  @Override
+  public void setType(LocalResourceType type) {
+    localResource.setType(type);
+  }
+
+  @Override
+  public LocalResourceVisibility getVisibility() {
+    return localResource.getVisibility();
+  }
+
+  @Override
+  public void setVisibility(LocalResourceVisibility visibility) {
+    localResource.setVisibility(visibility);
+  }
+
+  @Override
+  public String getPattern() {
+    return localResource.getPattern();
+  }
+
+  @Override
+  public void setPattern(String pattern) {
+    localResource.setPattern(pattern);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
new file mode 100644
index 0000000..d3a6a80
--- /dev/null
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Hadoop21YarnNMClient extends AbstractIdleService implements YarnNMClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnNMClient.class);
+
+  private final NMClient nmClient;
+
+  public Hadoop21YarnNMClient(Configuration configuration) {
+    this.nmClient = NMClient.createNMClient();
+    nmClient.init(configuration);
+  }
+
+  @Override
+  public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
+    try {
+      Container container = containerInfo.getContainer();
+      nmClient.startContainer(container, launchContext.<ContainerLaunchContext>getLaunchContext());
+      return new ContainerTerminator(container, nmClient);
+    } catch (Exception e) {
+      LOG.error("Error in launching process", e);
+      throw Throwables.propagate(e);
+    }
+
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    nmClient.start();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    nmClient.stop();
+  }
+
+  private static final class ContainerTerminator implements Cancellable {
+
+    private final Container container;
+    private final NMClient nmClient;
+
+    private ContainerTerminator(Container container, NMClient nmClient) {
+      this.container = container;
+      this.nmClient = nmClient;
+    }
+
+    @Override
+    public void cancel() {
+      LOG.info("Request to stop container {}.", container.getId());
+
+      try {
+        nmClient.stopContainer(container.getId(), container.getNodeId());
+        boolean completed = false;
+        while (!completed) {
+          ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
+          LOG.info("Container status: {} {}", status, status.getDiagnostics());
+
+          completed = (status.getState() == ContainerState.COMPLETE);
+        }
+        LOG.info("Container {} stopped.", container.getId());
+      } catch (Exception e) {
+        LOG.error("Fail to stop container {}", container.getId(), e);
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
new file mode 100644
index 0000000..b0eeb43
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
+/**
+ * A concrete implementation of {@link Location} for the HDFS filesystem.
+ */
+final class HDFSLocation implements Location {
+  private final FileSystem fs;
+  private final Path path;
+
+  /**
+   * Constructs a HDFSLocation.
+   *
+   * @param fs  An instance of {@link FileSystem}
+   * @param path of the file.
+   */
+  HDFSLocation(FileSystem fs, Path path) {
+    this.fs = fs;
+    this.path = path;
+  }
+
+  /**
+   * Checks if this location exists on HDFS.
+   *
+   * @return true if found; false otherwise.
+   * @throws IOException
+   */
+  @Override
+  public boolean exists() throws IOException {
+    return fs.exists(path);
+  }
+
+  /**
+   * @return An {@link InputStream} for this location on HDFS.
+   * @throws IOException
+   */
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return fs.open(path);
+  }
+
+  /**
+   * @return An {@link OutputStream} for this location on HDFS.
+   * @throws IOException
+   */
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return fs.create(path);
+  }
+
+  @Override
+  public OutputStream getOutputStream(String permission) throws IOException {
+    Configuration conf = fs.getConf();
+    return fs.create(path,
+                     new FsPermission(permission),
+                     true,
+                     conf.getInt("io.file.buffer.size", 4096),
+                     fs.getDefaultReplication(path),
+                     fs.getDefaultBlockSize(path),
+                     null);
+  }
+
+  /**
+   * Appends the child to the current {@link Location} on HDFS.
+   * <p>
+   * Returns a new instance of Location.
+   * </p>
+   *
+   * @param child to be appended to this location.
+   * @return A new instance of {@link Location}
+   * @throws IOException
+   */
+  @Override
+  public Location append(String child) throws IOException {
+    if (child.startsWith("/")) {
+      child = child.substring(1);
+    }
+    return new HDFSLocation(fs, new Path(URI.create(path.toUri() + "/" + child)));
+  }
+
+  @Override
+  public Location getTempFile(String suffix) throws IOException {
+    Path path = new Path(
+      URI.create(this.path.toUri() + "." + UUID.randomUUID() + (suffix == null ? TEMP_FILE_SUFFIX : suffix)));
+    return new HDFSLocation(fs, path);
+  }
+
+  /**
+   * @return Returns the name of the file or directory denoteed by this abstract pathname.
+   */
+  @Override
+  public String getName() {
+    return path.getName();
+  }
+
+  @Override
+  public boolean createNew() throws IOException {
+    return fs.createNewFile(path);
+  }
+
+  /**
+   * @return A {@link URI} for this location on HDFS.
+   */
+  @Override
+  public URI toURI() {
+    return path.toUri();
+  }
+
+  /**
+   * Deletes the file or directory denoted by this abstract pathname. If this
+   * pathname denotes a directory, then the directory must be empty in order
+   * to be deleted.
+   *
+   * @return true if and only if the file or directory is successfully deleted; false otherwise.
+   */
+  @Override
+  public boolean delete() throws IOException {
+    return fs.delete(path, false);
+  }
+
+  @Override
+  public boolean delete(boolean recursive) throws IOException {
+    return fs.delete(path, true);
+  }
+
+  @Override
+  public Location renameTo(Location destination) throws IOException {
+    // Destination will always be of the same type as this location.
+    if (fs instanceof DistributedFileSystem) {
+      ((DistributedFileSystem) fs).rename(path, ((HDFSLocation) destination).path, Options.Rename.OVERWRITE);
+      return new HDFSLocation(fs, new Path(destination.toURI()));
+    }
+
+    if (fs.rename(path, ((HDFSLocation) destination).path)) {
+      return new HDFSLocation(fs, new Path(destination.toURI()));
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Creates the directory named by this abstract pathname, including any necessary
+   * but nonexistent parent directories.
+   *
+   * @return true if and only if the renaming succeeded; false otherwise
+   */
+  @Override
+  public boolean mkdirs() throws IOException {
+    return fs.mkdirs(path);
+  }
+
+  /**
+   * @return Length of file.
+   */
+  @Override
+  public long length() throws IOException {
+    return fs.getFileStatus(path).getLen();
+  }
+
+  @Override
+  public long lastModified() throws IOException {
+    return fs.getFileStatus(path).getModificationTime();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
new file mode 100644
index 0000000..fa79391
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A {@link LocationFactory} that creates HDFS {@link Location}.
+ */
+public final class HDFSLocationFactory implements LocationFactory {
+
+  private final FileSystem fileSystem;
+  private final String pathBase;
+
+  public HDFSLocationFactory(Configuration configuration) {
+    this(getFileSystem(configuration));
+  }
+  
+  public HDFSLocationFactory(Configuration configuration, String pathBase) {
+    this(getFileSystem(configuration), pathBase);
+  }
+
+  public HDFSLocationFactory(FileSystem fileSystem) {
+    this(fileSystem, "/");
+  }
+
+  public HDFSLocationFactory(FileSystem fileSystem, String pathBase) {
+    String base = pathBase.equals("/") ? "" : pathBase;
+    base = base.endsWith("/") ? base.substring(0, base.length() - 1) : base;
+
+    this.fileSystem = fileSystem;
+    this.pathBase = base;
+  }
+
+  @Override
+  public Location create(String path) {
+    if (path.startsWith("/")) {
+      path = path.substring(1);
+    }
+    return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + path));
+  }
+
+  @Override
+  public Location create(URI uri) {
+    if (!uri.toString().startsWith(fileSystem.getUri().toString())) {
+      // It's a full URI
+      return new HDFSLocation(fileSystem, new Path(uri));
+    }
+    if (uri.isAbsolute()) {
+      return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + uri.getPath()));
+    }
+    return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + uri.getPath()));
+  }
+
+  @Override
+  public Location getHomeLocation() {
+    return new HDFSLocation(fileSystem, fileSystem.getHomeDirectory());
+  }
+
+  /**
+   * Returns the underlying {@link FileSystem} object.
+   */
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  private static FileSystem getFileSystem(Configuration configuration) {
+    try {
+      return FileSystem.get(configuration);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java
new file mode 100644
index 0000000..2ca09fd
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains HDFS location classes.
+ */
+package org.apache.twill.filesystem;