You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:23 UTC

[07/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
deleted file mode 100644
index c1bd75a..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AMRMClientImpl.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * Ported from Apache Hadoop YARN.
- */
-public final class AMRMClientImpl extends AbstractService implements AMRMClient {
-
-  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
-
-  private final RecordFactory recordFactory =
-    RecordFactoryProvider.getRecordFactory(null);
-
-  private int lastResponseId = 0;
-
-  protected AMRMProtocol rmClient;
-  protected final ApplicationAttemptId appAttemptId;
-  protected Resource clusterAvailableResources;
-  protected int clusterNodeCount;
-
-  //Key -> Priority
-  //Value -> Map
-  //Key->ResourceName (e.g., hostname, rackname, *)
-  //Value->Map
-  //Key->Resource Capability
-  //Value->ResourceRequest
-  protected final
-  Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
-    remoteRequestsTable =
-    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
-
-  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
-    new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
-  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
-
-  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
-    super(AMRMClientImpl.class.getName());
-    this.appAttemptId = appAttemptId;
-  }
-
-  @Override
-  public synchronized void init(Configuration conf) {
-    super.init(conf);
-  }
-
-  @Override
-  public synchronized void start() {
-    final YarnConfiguration conf = new YarnConfiguration(getConfig());
-    final YarnRPC rpc = YarnRPC.create(conf);
-    final InetSocketAddress rmAddress = conf.getSocketAddr(
-      YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
-    UserGroupInformation currentUser;
-    try {
-      currentUser = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      String tokenURLEncodedStr = System.getenv().get(
-        ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
-      try {
-        token.decodeFromUrlString(tokenURLEncodedStr);
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-
-      SecurityUtil.setTokenService(token, rmAddress);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AppMasterToken is " + token);
-      }
-      currentUser.addToken(token);
-    }
-
-    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
-      @Override
-      public AMRMProtocol run() {
-        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
-                                           conf);
-      }
-    });
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
-    super.start();
-  }
-
-  @Override
-  public synchronized void stop() {
-    if (this.rmClient != null) {
-      RPC.stopProxy(this.rmClient);
-    }
-    super.stop();
-  }
-
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster(
-    String appHostName, int appHostPort, String appTrackingUrl)
-    throws YarnRemoteException {
-    // do this only once ???
-    RegisterApplicationMasterRequest request = recordFactory
-      .newRecordInstance(RegisterApplicationMasterRequest.class);
-    synchronized (this) {
-      request.setApplicationAttemptId(appAttemptId);
-    }
-    request.setHost(appHostName);
-    request.setRpcPort(appHostPort);
-    if (appTrackingUrl != null) {
-      request.setTrackingUrl(appTrackingUrl);
-    }
-    RegisterApplicationMasterResponse response = rmClient
-      .registerApplicationMaster(request);
-    return response;
-  }
-
-  @Override
-  public AllocationResponse allocate(float progressIndicator)
-    throws YarnRemoteException {
-    AllocateResponse allocateResponse = null;
-    ArrayList<ResourceRequest> askList = null;
-    ArrayList<ContainerId> releaseList = null;
-    AllocateRequest allocateRequest = null;
-
-    try {
-      synchronized (this) {
-        askList = new ArrayList<ResourceRequest>(ask);
-        releaseList = new ArrayList<ContainerId>(release);
-        // optimistically clear this collection assuming no RPC failure
-        ask.clear();
-        release.clear();
-        allocateRequest = BuilderUtils
-          .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
-                              askList, releaseList);
-      }
-
-      allocateResponse = rmClient.allocate(allocateRequest);
-      AllocationResponse response = AllocationResponses.create(allocateResponse);
-
-      synchronized (this) {
-        // update these on successful RPC
-        clusterNodeCount = allocateResponse.getNumClusterNodes();
-        lastResponseId = response.getResponseId();
-        clusterAvailableResources = response.getAvailableResources();
-      }
-
-      return response;
-    } finally {
-      // TODO how to differentiate remote YARN exception vs error in RPC
-      if (allocateResponse == null) {
-        // We hit an exception in allocate()
-        // Preserve ask and release for next call to allocate()
-        synchronized (this) {
-          release.addAll(releaseList);
-          // Requests could have been added or deleted during call to allocate.
-          // If requests were added/removed then there is nothing to do since
-          // the ResourceRequest object in ask would have the actual new value.
-          // If ask does not have this ResourceRequest then it was unchanged and
-          // so we can add the value back safely.
-          // This assumes that there will no concurrent calls to allocate() and
-          // so we don't have to worry about ask being changed in the
-          // synchronized block at the beginning of this method.
-          for (ResourceRequest oldAsk : askList) {
-            if (!ask.contains(oldAsk)) {
-              ask.add(oldAsk);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-                                          String appMessage, String appTrackingUrl) throws YarnRemoteException {
-    FinishApplicationMasterRequest request = recordFactory
-      .newRecordInstance(FinishApplicationMasterRequest.class);
-    request.setAppAttemptId(appAttemptId);
-    request.setFinishApplicationStatus(appStatus);
-    if (appMessage != null) {
-      request.setDiagnostics(appMessage);
-    }
-    if (appTrackingUrl != null) {
-      request.setTrackingUrl(appTrackingUrl);
-    }
-    rmClient.finishApplicationMaster(request);
-  }
-
-  @Override
-  public synchronized void addContainerRequest(ContainerRequest req) {
-    // Create resource requests
-    if (req.hosts != null) {
-      for (String host : req.hosts) {
-        addResourceRequest(req.priority, host, req.capability, req.containerCount);
-      }
-    }
-
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        addResourceRequest(req.priority, rack, req.capability, req.containerCount);
-      }
-    }
-
-    // Off switch
-    addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
-  }
-
-  @Override
-  public synchronized void removeContainerRequest(ContainerRequest req) {
-    // Update resource requests
-    if (req.hosts != null) {
-      for (String hostName : req.hosts) {
-        decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
-      }
-    }
-
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        decResourceRequest(req.priority, rack, req.capability, req.containerCount);
-      }
-    }
-
-    decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
-  }
-
-  @Override
-  public synchronized void releaseAssignedContainer(ContainerId containerId) {
-    release.add(containerId);
-  }
-
-  @Override
-  public synchronized Resource getClusterAvailableResources() {
-    return clusterAvailableResources;
-  }
-
-  @Override
-  public synchronized int getClusterNodeCount() {
-    return clusterNodeCount;
-  }
-
-  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
-    // This code looks weird but is needed because of the following scenario.
-    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
-    // request is added to 'ask' to notify the RM about not needing it any more.
-    // Before the call to allocate, the user now requests more containers. If 
-    // the locations of the 0 size request and the new request are the same
-    // (with the difference being only container count), then the set comparator
-    // will consider both to be the same and not add the new request to ask. So 
-    // we need to check for the "same" request being present and remove it and 
-    // then add it back. The comparator is container count agnostic.
-    // This should happen only rarely but we do need to guard against it.
-    if (ask.contains(remoteRequest)) {
-      ask.remove(remoteRequest);
-    }
-    ask.add(remoteRequest);
-  }
-
-  private void addResourceRequest(Priority priority, String resourceName,
-                                  Resource capability, int containerCount) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
-      this.remoteRequestsTable.put(priority, remoteRequests);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Added priority=" + priority);
-      }
-    }
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      reqMap = new HashMap<Resource, ResourceRequest>();
-      remoteRequests.put(resourceName, reqMap);
-    }
-    ResourceRequest remoteRequest = reqMap.get(capability);
-    if (remoteRequest == null) {
-      remoteRequest = BuilderUtils.
-        newResourceRequest(priority, resourceName, capability, 0);
-      reqMap.put(capability, remoteRequest);
-    }
-
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
-
-    // Note this down for next interaction with ResourceManager
-    addResourceRequestToAsk(remoteRequest);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("addResourceRequest:" + " applicationId="
-                  + appAttemptId + " priority=" + priority.getPriority()
-                  + " resourceName=" + resourceName + " numContainers="
-                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
-    }
-  }
-
-  private void decResourceRequest(Priority priority, String resourceName,
-                                  Resource capability, int containerCount) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-
-    if (remoteRequests == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as priority " + priority
-                    + " is not present in request table");
-      }
-      return;
-    }
-
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as " + resourceName
-                    + " is not present in request table");
-      }
-      return;
-    }
-    ResourceRequest remoteRequest = reqMap.get(capability);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
-                  + appAttemptId + " priority=" + priority.getPriority()
-                  + " resourceName=" + resourceName + " numContainers="
-                  + remoteRequest.getNumContainers() + " #asks=" + ask.size());
-    }
-
-    remoteRequest.
-      setNumContainers(remoteRequest.getNumContainers() - containerCount);
-    if (remoteRequest.getNumContainers() < 0) {
-      // guard against spurious removals
-      remoteRequest.setNumContainers(0);
-    }
-    // Send the ResourceRequest to RM even if is 0 because it needs to override
-    // a previously sent value. If ResourceRequest was not sent previously then
-    // sending 0 ought to be a no-op on RM.
-    addResourceRequestToAsk(remoteRequest);
-
-    // Delete entries from map if no longer needed.
-    if (remoteRequest.getNumContainers() == 0) {
-      reqMap.remove(capability);
-      if (reqMap.size() == 0) {
-        remoteRequests.remove(resourceName);
-      }
-      if (remoteRequests.size() == 0) {
-        remoteRequestsTable.remove(priority);
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.info("AFTER decResourceRequest:" + " applicationId="
-                 + appAttemptId + " priority=" + priority.getPriority()
-                 + " resourceName=" + resourceName + " numContainers="
-                 + remoteRequest.getNumContainers() + " #asks=" + ask.size());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
deleted file mode 100644
index 89734fc..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponse.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * This interface is to abstract the differences in Vanilla Hadoop YARN 2.0 and CDH 4.4
- */
-public interface AllocationResponse {
-
-  int getResponseId();
-
-  Resource getAvailableResources();
-
-  List<Container> getAllocatedContainers();
-
-  List<ContainerStatus> getCompletedContainersStatuses();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java b/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
deleted file mode 100644
index ea46c3b..0000000
--- a/yarn/src/main/hadoop20/org/apache/twill/internal/yarn/ports/AllocationResponses.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn.ports;
-
-import com.google.common.base.Throwables;
-import com.google.common.reflect.TypeToken;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.List;
-
-/**
- * Factory for building instance of {@link AllocationResponse} based on the response type.
- */
-public final class AllocationResponses {
-
-  /**
-   * A hack for CDH 4.4.0, as the AllocateResponse class is being rewritten and diverted from YARN 2.0
-   */
-  private static final boolean IS_CDH_4_4;
-
-  static {
-    boolean result = false;
-    try {
-      try {
-        // See if it is standard YARN 2.0 AllocateResponse object.
-        AllocateResponse.class.getMethod("getAMResponse");
-      } catch (NoSuchMethodException e) {
-          // See if it is CDH 4.4 AllocateResponse object.
-        AllocationResponse.class.getMethod("getAllocatedContainers");
-        result = true;
-      }
-    } catch (Exception e) {
-      // Something very wrong in here, as it shouldn't arrive here.
-      e.printStackTrace();
-      throw Throwables.propagate(e);
-    }
-
-    IS_CDH_4_4 = result;
-  }
-
-  public static AllocationResponse create(Object response) {
-    if (IS_CDH_4_4) {
-      return new ReflectionAllocationResponse(response);
-    }
-
-    try {
-      Object amResponse = response.getClass().getMethod("getAMResponse").invoke(response);
-      return new ReflectionAllocationResponse(amResponse);
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private static final class ReflectionAllocationResponse implements AllocationResponse {
-
-    private final Object response;
-
-    private ReflectionAllocationResponse(Object response) {
-      this.response = response;
-    }
-
-    @Override
-    public int getResponseId() {
-      return call("getResponseId", TypeToken.of(Integer.class));
-    }
-
-    @Override
-    public Resource getAvailableResources() {
-      return call("getAvailableResources", TypeToken.of(Resource.class));
-    }
-
-    @Override
-    public List<Container> getAllocatedContainers() {
-      return call("getAllocatedContainers", new TypeToken<List<Container>>() {});
-    }
-
-    @Override
-    public List<ContainerStatus> getCompletedContainersStatuses() {
-      return call("getCompletedContainersStatuses", new TypeToken<List<ContainerStatus>>() {});
-    }
-
-    private <T> T call(String methodName, TypeToken<T> resultType) {
-      try {
-        return (T) resultType.getRawType().cast(response.getClass().getMethod(methodName).invoke(response));
-      } catch (Exception e) {
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-
-  private AllocationResponses() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
deleted file mode 100644
index ce8f90f..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
-import java.util.UUID;
-
-/**
- *
- */
-public final class Hadoop21YarnAMClient extends AbstractIdleService implements YarnAMClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
-
-  private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
-
-  static {
-    STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
-      @Override
-      public YarnContainerStatus apply(ContainerStatus status) {
-        return new Hadoop21YarnContainerStatus(status);
-      }
-    };
-  }
-
-  private final ContainerId containerId;
-  private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
-  private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
-  private final Hadoop21YarnNMClient nmClient;
-  private InetSocketAddress trackerAddr;
-  private URL trackerUrl;
-  private Resource maxCapability;
-
-  public Hadoop21YarnAMClient(Configuration conf) {
-    String masterContainerId = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
-    Preconditions.checkArgument(masterContainerId != null,
-                                "Missing %s from environment", ApplicationConstants.Environment.CONTAINER_ID.name());
-    this.containerId = ConverterUtils.toContainerId(masterContainerId);
-    this.containerRequests = ArrayListMultimap.create();
-
-    this.amrmClient = AMRMClient.createAMRMClient();
-    this.amrmClient.init(conf);
-    this.nmClient = new Hadoop21YarnNMClient(conf);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
-    Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
-
-    amrmClient.start();
-    RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
-                                                                                      trackerAddr.getPort(),
-                                                                                      trackerUrl.toString());
-    maxCapability = response.getMaximumResourceCapability();
-    nmClient.startAndWait();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    nmClient.stopAndWait();
-    amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
-    amrmClient.stop();
-  }
-
-  @Override
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  @Override
-  public String getHost() {
-    return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
-  }
-
-  @Override
-  public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
-    this.trackerAddr = trackerAddr;
-    this.trackerUrl = trackerUrl;
-  }
-
-  @Override
-  public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
-    AllocateResponse allocateResponse = amrmClient.allocate(progress);
-    List<ProcessLauncher<YarnContainerInfo>> launchers
-      = Lists.newArrayListWithCapacity(allocateResponse.getAllocatedContainers().size());
-
-    for (Container container : allocateResponse.getAllocatedContainers()) {
-      launchers.add(new RunnableProcessLauncher(new Hadoop21YarnContainerInfo(container), nmClient));
-    }
-
-    if (!launchers.isEmpty()) {
-      handler.acquired(launchers);
-
-      // If no process has been launched through the given launcher, return the container.
-      for (ProcessLauncher<YarnContainerInfo> l : launchers) {
-        // This cast always works.
-        RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
-        if (!launcher.isLaunched()) {
-          Container container = launcher.getContainerInfo().getContainer();
-          LOG.info("Nothing to run in container, releasing it: {}", container);
-          amrmClient.releaseAssignedContainer(container.getId());
-        }
-      }
-    }
-
-    List<YarnContainerStatus> completed = ImmutableList.copyOf(
-      Iterables.transform(allocateResponse.getCompletedContainersStatuses(), STATUS_TRANSFORM));
-    if (!completed.isEmpty()) {
-      handler.completed(completed);
-    }
-  }
-
-  @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability) {
-    return addContainerRequest(capability, 1);
-  }
-
-  @Override
-  public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
-    return new ContainerRequestBuilder(adjustCapability(capability), count) {
-      @Override
-      public String apply() {
-        synchronized (Hadoop21YarnAMClient.this) {
-          String id = UUID.randomUUID().toString();
-
-          String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
-          String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
-          for (int i = 0; i < count; i++) {
-            AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
-            containerRequests.put(id, request);
-            amrmClient.addContainerRequest(request);
-          }
-
-          return id;
-        }
-      }
-    };
-  }
-
-  @Override
-  public synchronized void completeContainerRequest(String id) {
-    for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
-      amrmClient.removeContainerRequest(request);
-    }
-  }
-
-  private Resource adjustCapability(Resource resource) {
-    int cores = resource.getVirtualCores();
-    int updatedCores = Math.min(resource.getVirtualCores(), maxCapability.getVirtualCores());
-
-    if (cores != updatedCores) {
-      resource.setVirtualCores(updatedCores);
-      LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
-    }
-
-    int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
-    if (resource.getMemory() != updatedMemory) {
-      resource.setMemory(updatedMemory);
-      LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
-    }
-
-    return resource;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
deleted file mode 100644
index 50b212d..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
-import org.apache.twill.internal.appmaster.ApplicationSubmitter;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public final class Hadoop21YarnAppClient extends AbstractIdleService implements YarnAppClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
-  private final YarnClient yarnClient;
-
-  public Hadoop21YarnAppClient(Configuration configuration) {
-    this.yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(configuration);
-  }
-
-  @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec) throws Exception {
-    // Request for new application
-    YarnClientApplication application = yarnClient.createApplication();
-    final GetNewApplicationResponse response = application.getNewApplicationResponse();
-    final ApplicationId appId = response.getApplicationId();
-
-    // Setup the context for application submission
-    final ApplicationSubmissionContext appSubmissionContext = application.getApplicationSubmissionContext();
-    appSubmissionContext.setApplicationId(appId);
-    appSubmissionContext.setApplicationName(twillSpec.getName());
-
-    ApplicationSubmitter submitter = new ApplicationSubmitter() {
-      @Override
-      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) {
-        ContainerLaunchContext launchContext = context.getLaunchContext();
-
-        addRMToken(launchContext);
-        appSubmissionContext.setAMContainerSpec(launchContext);
-        appSubmissionContext.setResource(adjustMemory(response, capability));
-        appSubmissionContext.setMaxAppAttempts(2);
-
-        try {
-          yarnClient.submitApplication(appSubmissionContext);
-          return new ProcessControllerImpl(yarnClient, appId);
-        } catch (Exception e) {
-          LOG.error("Failed to submit application {}", appId, e);
-          throw Throwables.propagate(e);
-        }
-      }
-    };
-
-    return new ApplicationMasterProcessLauncher(appId, submitter);
-  }
-
-  private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
-    int maxMemory = response.getMaximumResourceCapability().getMemory();
-    int updatedMemory = capability.getMemory();
-
-    if (updatedMemory > maxMemory) {
-      capability.setMemory(maxMemory);
-    }
-
-    return capability;
-  }
-
-  private void addRMToken(ContainerLaunchContext context) {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return;
-    }
-
-    try {
-      Credentials credentials = YarnUtils.decodeCredentials(context.getTokens());
-
-      Configuration config = yarnClient.getConfig();
-      Token<TokenIdentifier> token = ConverterUtils.convertFromYarn(
-        yarnClient.getRMDelegationToken(new Text(YarnUtils.getYarnTokenRenewer(config))),
-        YarnUtils.getRMAddress(config));
-
-      LOG.info("Added RM delegation token {}", token);
-      credentials.addToken(token.getService(), token);
-
-      context.setTokens(YarnUtils.encodeCredentials(credentials));
-
-    } catch (Exception e) {
-      LOG.error("Fails to create credentials.", e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception {
-    // Ignore user
-    return createLauncher(twillSpec);
-  }
-
-  @Override
-  public ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId) {
-    return new ProcessControllerImpl(yarnClient, appId);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    yarnClient.start();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    yarnClient.stop();
-  }
-
-  private static final class ProcessControllerImpl implements ProcessController<YarnApplicationReport> {
-    private final YarnClient yarnClient;
-    private final ApplicationId appId;
-
-    public ProcessControllerImpl(YarnClient yarnClient, ApplicationId appId) {
-      this.yarnClient = yarnClient;
-      this.appId = appId;
-    }
-
-    @Override
-    public YarnApplicationReport getReport() {
-      try {
-        return new Hadoop21YarnApplicationReport(yarnClient.getApplicationReport(appId));
-      } catch (Exception e) {
-        LOG.error("Failed to get application report {}", appId, e);
-        throw Throwables.propagate(e);
-      }
-    }
-
-    @Override
-    public void cancel() {
-      try {
-        yarnClient.killApplication(appId);
-      } catch (Exception e) {
-        LOG.error("Failed to kill application {}", appId, e);
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
deleted file mode 100644
index 6e614f5..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnApplicationReport.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-
-/**
- *
- */
-public final class Hadoop21YarnApplicationReport implements YarnApplicationReport {
-
-  private final ApplicationReport report;
-
-  public Hadoop21YarnApplicationReport(ApplicationReport report) {
-    this.report = report;
-  }
-
-  @Override
-  public ApplicationId getApplicationId() {
-    return report.getApplicationId();
-  }
-
-  @Override
-  public ApplicationAttemptId getCurrentApplicationAttemptId() {
-    return report.getCurrentApplicationAttemptId();
-  }
-
-  @Override
-  public String getQueue() {
-    return report.getQueue();
-  }
-
-  @Override
-  public String getName() {
-    return report.getName();
-  }
-
-  @Override
-  public String getHost() {
-    return report.getHost();
-  }
-
-  @Override
-  public int getRpcPort() {
-    return report.getRpcPort();
-  }
-
-  @Override
-  public YarnApplicationState getYarnApplicationState() {
-    return report.getYarnApplicationState();
-  }
-
-  @Override
-  public String getDiagnostics() {
-    return report.getDiagnostics();
-  }
-
-  @Override
-  public String getTrackingUrl() {
-    return report.getTrackingUrl();
-  }
-
-  @Override
-  public String getOriginalTrackingUrl() {
-    return report.getOriginalTrackingUrl();
-  }
-
-  @Override
-  public long getStartTime() {
-    return report.getStartTime();
-  }
-
-  @Override
-  public long getFinishTime() {
-    return report.getFinishTime();
-  }
-
-  @Override
-  public FinalApplicationStatus getFinalApplicationStatus() {
-    return report.getFinalApplicationStatus();
-  }
-
-  @Override
-  public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
-    return report.getApplicationResourceUsageReport();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
deleted file mode 100644
index 86903c1..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.yarn.api.records.Container;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- *
- */
-public final class Hadoop21YarnContainerInfo implements YarnContainerInfo {
-
-  private final Container container;
-
-  public Hadoop21YarnContainerInfo(Container container) {
-    this.container = container;
-  }
-
-  @Override
-  public <T> T getContainer() {
-    return (T) container;
-  }
-
-  @Override
-  public String getId() {
-    return container.getId().toString();
-  }
-
-  @Override
-  public InetAddress getHost() {
-    try {
-      return InetAddress.getByName(container.getNodeId().getHost());
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public int getPort() {
-    return container.getNodeId().getPort();
-  }
-
-  @Override
-  public int getMemoryMB() {
-    return container.getResource().getMemory();
-  }
-
-  @Override
-  public int getVirtualCores() {
-    return container.getResource().getVirtualCores();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
deleted file mode 100644
index f5758c7..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerStatus.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-
-/**
- *
- */
-public final class Hadoop21YarnContainerStatus implements YarnContainerStatus {
-
-  private final ContainerStatus containerStatus;
-
-  public Hadoop21YarnContainerStatus(ContainerStatus containerStatus) {
-    this.containerStatus = containerStatus;
-  }
-
-  @Override
-  public String getContainerId() {
-    return containerStatus.getContainerId().toString();
-  }
-
-  @Override
-  public ContainerState getState() {
-    return containerStatus.getState();
-  }
-
-  @Override
-  public int getExitStatus() {
-    return containerStatus.getExitStatus();
-  }
-
-  @Override
-  public String getDiagnostics() {
-    return containerStatus.getDiagnostics();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
deleted file mode 100644
index 8621f93..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLaunchContext.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class Hadoop21YarnLaunchContext implements YarnLaunchContext {
-
-  private static final Function<YarnLocalResource, LocalResource> RESOURCE_TRANSFORM;
-
-  static {
-    // Creates transform function from YarnLocalResource -> LocalResource
-    RESOURCE_TRANSFORM = new Function<YarnLocalResource, LocalResource>() {
-      @Override
-      public LocalResource apply(YarnLocalResource input) {
-        return input.getLocalResource();
-      }
-    };
-  }
-
-  private final ContainerLaunchContext launchContext;
-
-  public Hadoop21YarnLaunchContext() {
-    launchContext = Records.newRecord(ContainerLaunchContext.class);
-  }
-
-  @Override
-  public <T> T getLaunchContext() {
-    return (T) launchContext;
-  }
-
-  @Override
-  public void setCredentials(Credentials credentials) {
-    launchContext.setTokens(YarnUtils.encodeCredentials(credentials));
-  }
-
-  @Override
-  public void setLocalResources(Map<String, YarnLocalResource> localResources) {
-    launchContext.setLocalResources(Maps.transformValues(localResources, RESOURCE_TRANSFORM));
-  }
-
-  @Override
-  public void setServiceData(Map<String, ByteBuffer> serviceData) {
-    launchContext.setServiceData(serviceData);
-  }
-
-  @Override
-  public Map<String, String> getEnvironment() {
-    return launchContext.getEnvironment();
-  }
-
-  @Override
-  public void setEnvironment(Map<String, String> environment) {
-    launchContext.setEnvironment(environment);
-  }
-
-  @Override
-  public List<String> getCommands() {
-    return launchContext.getCommands();
-  }
-
-  @Override
-  public void setCommands(List<String> commands) {
-    launchContext.setCommands(commands);
-  }
-
-  @Override
-  public void setApplicationACLs(Map<ApplicationAccessType, String> acls) {
-    launchContext.setApplicationACLs(acls);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
deleted file mode 100644
index 3f756bd..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnLocalResource.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- *
- */
-public final class Hadoop21YarnLocalResource implements YarnLocalResource {
-
-  private final LocalResource localResource;
-
-  public Hadoop21YarnLocalResource() {
-    this.localResource = Records.newRecord(LocalResource.class);
-  }
-
-  @Override
-  public <T> T getLocalResource() {
-    return (T) localResource;
-  }
-
-  @Override
-  public URL getResource() {
-    return localResource.getResource();
-  }
-
-  @Override
-  public void setResource(URL resource) {
-    localResource.setResource(resource);
-  }
-
-  @Override
-  public long getSize() {
-    return localResource.getSize();
-  }
-
-  @Override
-  public void setSize(long size) {
-    localResource.setSize(size);
-  }
-
-  @Override
-  public long getTimestamp() {
-    return localResource.getTimestamp();
-  }
-
-  @Override
-  public void setTimestamp(long timestamp) {
-    localResource.setTimestamp(timestamp);
-  }
-
-  @Override
-  public LocalResourceType getType() {
-    return localResource.getType();
-  }
-
-  @Override
-  public void setType(LocalResourceType type) {
-    localResource.setType(type);
-  }
-
-  @Override
-  public LocalResourceVisibility getVisibility() {
-    return localResource.getVisibility();
-  }
-
-  @Override
-  public void setVisibility(LocalResourceVisibility visibility) {
-    localResource.setVisibility(visibility);
-  }
-
-  @Override
-  public String getPattern() {
-    return localResource.getPattern();
-  }
-
-  @Override
-  public void setPattern(String pattern) {
-    localResource.setPattern(pattern);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java b/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
deleted file mode 100644
index d3a6a80..0000000
--- a/yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnNMClient.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.yarn;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public final class Hadoop21YarnNMClient extends AbstractIdleService implements YarnNMClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnNMClient.class);
-
-  private final NMClient nmClient;
-
-  public Hadoop21YarnNMClient(Configuration configuration) {
-    this.nmClient = NMClient.createNMClient();
-    nmClient.init(configuration);
-  }
-
-  @Override
-  public Cancellable start(YarnContainerInfo containerInfo, YarnLaunchContext launchContext) {
-    try {
-      Container container = containerInfo.getContainer();
-      nmClient.startContainer(container, launchContext.<ContainerLaunchContext>getLaunchContext());
-      return new ContainerTerminator(container, nmClient);
-    } catch (Exception e) {
-      LOG.error("Error in launching process", e);
-      throw Throwables.propagate(e);
-    }
-
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    nmClient.start();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    nmClient.stop();
-  }
-
-  private static final class ContainerTerminator implements Cancellable {
-
-    private final Container container;
-    private final NMClient nmClient;
-
-    private ContainerTerminator(Container container, NMClient nmClient) {
-      this.container = container;
-      this.nmClient = nmClient;
-    }
-
-    @Override
-    public void cancel() {
-      LOG.info("Request to stop container {}.", container.getId());
-
-      try {
-        nmClient.stopContainer(container.getId(), container.getNodeId());
-        boolean completed = false;
-        while (!completed) {
-          ContainerStatus status = nmClient.getContainerStatus(container.getId(), container.getNodeId());
-          LOG.info("Container status: {} {}", status, status.getDiagnostics());
-
-          completed = (status.getState() == ContainerState.COMPLETE);
-        }
-        LOG.info("Container {} stopped.", container.getId());
-      } catch (Exception e) {
-        LOG.error("Fail to stop container {}", container.getId(), e);
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
deleted file mode 100644
index b0eeb43..0000000
--- a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.UUID;
-
-/**
- * A concrete implementation of {@link Location} for the HDFS filesystem.
- */
-final class HDFSLocation implements Location {
-  private final FileSystem fs;
-  private final Path path;
-
-  /**
-   * Constructs a HDFSLocation.
-   *
-   * @param fs  An instance of {@link FileSystem}
-   * @param path of the file.
-   */
-  HDFSLocation(FileSystem fs, Path path) {
-    this.fs = fs;
-    this.path = path;
-  }
-
-  /**
-   * Checks if this location exists on HDFS.
-   *
-   * @return true if found; false otherwise.
-   * @throws IOException
-   */
-  @Override
-  public boolean exists() throws IOException {
-    return fs.exists(path);
-  }
-
-  /**
-   * @return An {@link InputStream} for this location on HDFS.
-   * @throws IOException
-   */
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return fs.open(path);
-  }
-
-  /**
-   * @return An {@link OutputStream} for this location on HDFS.
-   * @throws IOException
-   */
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return fs.create(path);
-  }
-
-  @Override
-  public OutputStream getOutputStream(String permission) throws IOException {
-    Configuration conf = fs.getConf();
-    return fs.create(path,
-                     new FsPermission(permission),
-                     true,
-                     conf.getInt("io.file.buffer.size", 4096),
-                     fs.getDefaultReplication(path),
-                     fs.getDefaultBlockSize(path),
-                     null);
-  }
-
-  /**
-   * Appends the child to the current {@link Location} on HDFS.
-   * <p>
-   * Returns a new instance of Location.
-   * </p>
-   *
-   * @param child to be appended to this location.
-   * @return A new instance of {@link Location}
-   * @throws IOException
-   */
-  @Override
-  public Location append(String child) throws IOException {
-    if (child.startsWith("/")) {
-      child = child.substring(1);
-    }
-    return new HDFSLocation(fs, new Path(URI.create(path.toUri() + "/" + child)));
-  }
-
-  @Override
-  public Location getTempFile(String suffix) throws IOException {
-    Path path = new Path(
-      URI.create(this.path.toUri() + "." + UUID.randomUUID() + (suffix == null ? TEMP_FILE_SUFFIX : suffix)));
-    return new HDFSLocation(fs, path);
-  }
-
-  /**
-   * @return Returns the name of the file or directory denoteed by this abstract pathname.
-   */
-  @Override
-  public String getName() {
-    return path.getName();
-  }
-
-  @Override
-  public boolean createNew() throws IOException {
-    return fs.createNewFile(path);
-  }
-
-  /**
-   * @return A {@link URI} for this location on HDFS.
-   */
-  @Override
-  public URI toURI() {
-    return path.toUri();
-  }
-
-  /**
-   * Deletes the file or directory denoted by this abstract pathname. If this
-   * pathname denotes a directory, then the directory must be empty in order
-   * to be deleted.
-   *
-   * @return true if and only if the file or directory is successfully deleted; false otherwise.
-   */
-  @Override
-  public boolean delete() throws IOException {
-    return fs.delete(path, false);
-  }
-
-  @Override
-  public boolean delete(boolean recursive) throws IOException {
-    return fs.delete(path, true);
-  }
-
-  @Override
-  public Location renameTo(Location destination) throws IOException {
-    // Destination will always be of the same type as this location.
-    if (fs instanceof DistributedFileSystem) {
-      ((DistributedFileSystem) fs).rename(path, ((HDFSLocation) destination).path, Options.Rename.OVERWRITE);
-      return new HDFSLocation(fs, new Path(destination.toURI()));
-    }
-
-    if (fs.rename(path, ((HDFSLocation) destination).path)) {
-      return new HDFSLocation(fs, new Path(destination.toURI()));
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Creates the directory named by this abstract pathname, including any necessary
-   * but nonexistent parent directories.
-   *
-   * @return true if and only if the renaming succeeded; false otherwise
-   */
-  @Override
-  public boolean mkdirs() throws IOException {
-    return fs.mkdirs(path);
-  }
-
-  /**
-   * @return Length of file.
-   */
-  @Override
-  public long length() throws IOException {
-    return fs.getFileStatus(path).getLen();
-  }
-
-  @Override
-  public long lastModified() throws IOException {
-    return fs.getFileStatus(path).getModificationTime();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java b/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
deleted file mode 100644
index fa79391..0000000
--- a/yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * A {@link LocationFactory} that creates HDFS {@link Location}.
- */
-public final class HDFSLocationFactory implements LocationFactory {
-
-  private final FileSystem fileSystem;
-  private final String pathBase;
-
-  public HDFSLocationFactory(Configuration configuration) {
-    this(getFileSystem(configuration));
-  }
-  
-  public HDFSLocationFactory(Configuration configuration, String pathBase) {
-    this(getFileSystem(configuration), pathBase);
-  }
-
-  public HDFSLocationFactory(FileSystem fileSystem) {
-    this(fileSystem, "/");
-  }
-
-  public HDFSLocationFactory(FileSystem fileSystem, String pathBase) {
-    String base = pathBase.equals("/") ? "" : pathBase;
-    base = base.endsWith("/") ? base.substring(0, base.length() - 1) : base;
-
-    this.fileSystem = fileSystem;
-    this.pathBase = base;
-  }
-
-  @Override
-  public Location create(String path) {
-    if (path.startsWith("/")) {
-      path = path.substring(1);
-    }
-    return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + path));
-  }
-
-  @Override
-  public Location create(URI uri) {
-    if (!uri.toString().startsWith(fileSystem.getUri().toString())) {
-      // It's a full URI
-      return new HDFSLocation(fileSystem, new Path(uri));
-    }
-    if (uri.isAbsolute()) {
-      return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + uri.getPath()));
-    }
-    return new HDFSLocation(fileSystem, new Path(fileSystem.getUri() + "/" + pathBase + "/" + uri.getPath()));
-  }
-
-  @Override
-  public Location getHomeLocation() {
-    return new HDFSLocation(fileSystem, fileSystem.getHomeDirectory());
-  }
-
-  /**
-   * Returns the underlying {@link FileSystem} object.
-   */
-  public FileSystem getFileSystem() {
-    return fileSystem;
-  }
-
-  private static FileSystem getFileSystem(Configuration configuration) {
-    try {
-      return FileSystem.get(configuration);
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/filesystem/package-info.java b/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
deleted file mode 100644
index 2ca09fd..0000000
--- a/yarn/src/main/java/org/apache/twill/filesystem/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Contains HDFS location classes.
- */
-package org.apache.twill.filesystem;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
deleted file mode 100644
index 47dd07c..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.SystemMessages;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.concurrent.Executor;
-
-/**
- * A base implementation of {@link Service} handle secure token update.
- */
-public abstract class AbstractTwillService implements Service {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
-
-  protected final Location applicationLocation;
-
-  protected volatile Credentials credentials;
-
-  protected AbstractTwillService(Location applicationLocation) {
-    this.applicationLocation = applicationLocation;
-  }
-
-  protected abstract Service getServiceDelegate();
-
-  /**
-   * Returns the location of the secure store, or {@code null} if either not running in secure mode or an error
-   * occur when trying to acquire the location.
-   */
-  protected final Location getSecureStoreLocation() {
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return null;
-    }
-    try {
-      return applicationLocation.append(Constants.Files.CREDENTIALS);
-    } catch (IOException e) {
-      LOG.error("Failed to create secure store location.", e);
-      return null;
-    }
-  }
-
-  /**
-   * Attempts to handle secure store update.
-   *
-   * @param message The message received
-   * @return {@code true} if the message requests for secure store update, {@code false} otherwise.
-   */
-  protected final boolean handleSecureStoreUpdate(Message message) {
-    if (!SystemMessages.SECURE_STORE_UPDATED.equals(message)) {
-      return false;
-    }
-
-    // If not in secure mode, simply ignore the message.
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return true;
-    }
-
-    try {
-      Credentials credentials = new Credentials();
-      Location location = getSecureStoreLocation();
-      DataInputStream input = new DataInputStream(new BufferedInputStream(location.getInputStream()));
-      try {
-        credentials.readTokenStorageStream(input);
-      } finally {
-        input.close();
-      }
-
-      UserGroupInformation.getCurrentUser().addCredentials(credentials);
-      this.credentials = credentials;
-
-      LOG.info("Secure store updated from {}.", location.toURI());
-
-    } catch (Throwable t) {
-      LOG.error("Failed to update secure store.", t);
-    }
-
-    return true;
-  }
-
-  @Override
-  public final ListenableFuture<State> start() {
-    return getServiceDelegate().start();
-  }
-
-  @Override
-  public final State startAndWait() {
-    return Futures.getUnchecked(start());
-  }
-
-  @Override
-  public final boolean isRunning() {
-    return getServiceDelegate().isRunning();
-  }
-
-  @Override
-  public final State state() {
-    return getServiceDelegate().state();
-  }
-
-  @Override
-  public final ListenableFuture<State> stop() {
-    return getServiceDelegate().stop();
-  }
-
-  @Override
-  public final State stopAndWait() {
-    return Futures.getUnchecked(stop());
-  }
-
-  @Override
-  public final void addListener(Listener listener, Executor executor) {
-    getServiceDelegate().addListener(listener, executor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
deleted file mode 100644
index 4ffb023..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.common.Services;
-import org.apache.twill.filesystem.HDFSLocationFactory;
-import org.apache.twill.filesystem.LocalLocationFactory;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.logging.KafkaAppender;
-import org.apache.twill.zookeeper.ZKClientService;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import ch.qos.logback.classic.util.ContextInitializer;
-import ch.qos.logback.core.joran.spi.JoranException;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.ILoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.InputSource;
-
-import java.io.File;
-import java.io.StringReader;
-import java.net.URI;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Class for main method that starts a service.
- */
-public abstract class ServiceMain {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ServiceMain.class);
-
-  static {
-    // This is to work around detection of HADOOP_HOME (HADOOP-9422)
-    if (!System.getenv().containsKey("HADOOP_HOME") && System.getProperty("hadoop.home.dir") == null) {
-      System.setProperty("hadoop.home.dir", new File("").getAbsolutePath());
-    }
-  }
-
-  protected final void doMain(final ZKClientService zkClientService,
-                              final Service service) throws ExecutionException, InterruptedException {
-    configureLogger();
-
-    final String serviceName = service.toString();
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        Services.chainStop(service, zkClientService);
-      }
-    });
-
-    // Listener for state changes of the service
-    ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
-
-    // Starts the service
-    LOG.info("Starting service {}.", serviceName);
-    Futures.getUnchecked(Services.chainStart(zkClientService, service));
-    LOG.info("Service {} started.", serviceName);
-    try {
-      completion.get();
-      LOG.info("Service {} completed.", serviceName);
-    } catch (Throwable t) {
-      LOG.warn("Exception thrown from service {}.", serviceName, t);
-      throw Throwables.propagate(t);
-    } finally {
-      ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
-      if (loggerFactory instanceof LoggerContext) {
-        ((LoggerContext) loggerFactory).stop();
-      }
-    }
-  }
-
-  protected abstract String getHostname();
-
-  protected abstract String getKafkaZKConnect();
-
-  /**
-   * Returns the {@link Location} for the application based on the env {@link EnvKeys#TWILL_APP_DIR}.
-   */
-  protected static Location createAppLocation(Configuration conf) {
-    // Note: It's a little bit hacky based on the uri schema to create the LocationFactory, refactor it later.
-    URI appDir = URI.create(System.getenv(EnvKeys.TWILL_APP_DIR));
-
-    try {
-      if ("file".equals(appDir.getScheme())) {
-        return new LocalLocationFactory().create(appDir);
-      }
-
-      if ("hdfs".equals(appDir.getScheme())) {
-        if (UserGroupInformation.isSecurityEnabled()) {
-          return new HDFSLocationFactory(FileSystem.get(conf)).create(appDir);
-        }
-
-        String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
-        if (fsUser == null) {
-          throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
-        }
-        return new HDFSLocationFactory(FileSystem.get(FileSystem.getDefaultUri(conf), conf, fsUser)).create(appDir);
-      }
-
-      LOG.warn("Unsupported location type {}.", appDir);
-      throw new IllegalArgumentException("Unsupported location type " + appDir);
-
-    } catch (Exception e) {
-      LOG.error("Failed to create application location for {}.", appDir);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private void configureLogger() {
-    // Check if SLF4J is bound to logback in the current environment
-    ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
-    if (!(loggerFactory instanceof LoggerContext)) {
-      return;
-    }
-
-    LoggerContext context = (LoggerContext) loggerFactory;
-    context.reset();
-    JoranConfigurator configurator = new JoranConfigurator();
-    configurator.setContext(context);
-
-    try {
-      File twillLogback = new File(Constants.Files.LOGBACK_TEMPLATE);
-      if (twillLogback.exists()) {
-        configurator.doConfigure(twillLogback);
-      }
-      new ContextInitializer(context).autoConfig();
-    } catch (JoranException e) {
-      throw Throwables.propagate(e);
-    }
-    doConfigure(configurator, getLogConfig(getLoggerLevel(context.getLogger(Logger.ROOT_LOGGER_NAME))));
-  }
-
-  private void doConfigure(JoranConfigurator configurator, String config) {
-    try {
-      configurator.doConfigure(new InputSource(new StringReader(config)));
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private String getLogConfig(String rootLevel) {
-    return
-      "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
-      "<configuration>\n" +
-      "    <appender name=\"KAFKA\" class=\"" + KafkaAppender.class.getName() + "\">\n" +
-      "        <topic>" + Constants.LOG_TOPIC + "</topic>\n" +
-      "        <hostname>" + getHostname() + "</hostname>\n" +
-      "        <zookeeper>" + getKafkaZKConnect() + "</zookeeper>\n" +
-      "    </appender>\n" +
-      "    <logger name=\"org.apache.twill.internal.logging\" additivity=\"false\" />\n" +
-      "    <root level=\"" + rootLevel + "\">\n" +
-      "        <appender-ref ref=\"KAFKA\"/>\n" +
-      "    </root>\n" +
-      "</configuration>";
-  }
-
-  private String getLoggerLevel(Logger logger) {
-    if (logger instanceof ch.qos.logback.classic.Logger) {
-      return ((ch.qos.logback.classic.Logger) logger).getLevel().toString();
-    }
-
-    if (logger.isTraceEnabled()) {
-      return "TRACE";
-    }
-    if (logger.isDebugEnabled()) {
-      return "DEBUG";
-    }
-    if (logger.isInfoEnabled()) {
-      return "INFO";
-    }
-    if (logger.isWarnEnabled()) {
-      return "WARN";
-    }
-    if (logger.isErrorEnabled()) {
-      return "ERROR";
-    }
-    return "OFF";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
deleted file mode 100644
index 028df7b..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterLiveNodeData.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-/**
- * Represents data being stored in the live node of the application master.
- */
-public final class ApplicationMasterLiveNodeData {
-
-  private final int appId;
-  private final long appIdClusterTime;
-  private final String containerId;
-
-  public ApplicationMasterLiveNodeData(int appId, long appIdClusterTime, String containerId) {
-    this.appId = appId;
-    this.appIdClusterTime = appIdClusterTime;
-    this.containerId = containerId;
-  }
-
-  public int getAppId() {
-    return appId;
-  }
-
-  public long getAppIdClusterTime() {
-    return appIdClusterTime;
-  }
-
-  public String getContainerId() {
-    return containerId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
deleted file mode 100644
index b34a7a2..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.ServiceMain;
-import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.util.concurrent.Service;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Main class for launching {@link ApplicationMasterService}.
- */
-public final class ApplicationMasterMain extends ServiceMain {
-
-  private final String kafkaZKConnect;
-
-  private ApplicationMasterMain(String kafkaZKConnect) {
-    this.kafkaZKConnect = kafkaZKConnect;
-  }
-
-  /**
-   * Starts the application master.
-   */
-  public static void main(String[] args) throws Exception {
-    String zkConnect = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
-    File twillSpec = new File(Constants.Files.TWILL_SPEC);
-    RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
-
-    ZKClientService zkClientService =
-      ZKClientServices.delegate(
-        ZKClients.reWatchOnExpire(
-          ZKClients.retryOnFailure(
-            ZKClientService.Builder.of(zkConnect).build(),
-            RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
-    Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
-    Service service = new ApplicationMasterService(runId, zkClientService, twillSpec,
-                                                   new VersionDetectYarnAMClientFactory(conf), createAppLocation(conf));
-    new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())).doMain(zkClientService, service);
-  }
-
-  @Override
-  protected String getHostname() {
-    try {
-      return InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException e) {
-      return "unknown";
-    }
-  }
-
-  @Override
-  protected String getKafkaZKConnect() {
-    return kafkaZKConnect;
-  }
-}