You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC

svn commit: r1513258 [5/10] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-...

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Mon Aug 12 21:25:49 2013
@@ -23,7 +23,10 @@ import static org.apache.hadoop.service.
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +40,7 @@ import org.apache.hadoop.io.DataInputByt
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -47,23 +51,25 @@ import org.apache.hadoop.service.Service
 import org.apache.hadoop.service.ServiceStateChangeListener;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -79,6 +85,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -125,8 +132,6 @@ public class ContainerManagerImpl extend
 
   private final NodeStatusUpdater nodeStatusUpdater;
 
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
   protected LocalDirsHandlerService dirsHandler;
   protected final AsyncDispatcher dispatcher;
   private final ApplicationACLsManager aclsManager;
@@ -166,7 +171,6 @@ public class ContainerManagerImpl extend
         new ContainersMonitorImpl(exec, dispatcher, this.context);
     addService(this.containersMonitor);
 
-
     dispatcher.register(ContainerEventType.class,
         new ContainerEventDispatcher());
     dispatcher.register(ApplicationEventType.class,
@@ -228,6 +232,13 @@ public class ContainerManagerImpl extend
     // Enqueue user dirs in deletion context
 
     Configuration conf = getConfig();
+    Configuration serverConf = new Configuration(conf);
+
+    // always enforce it to be token-based.
+    serverConf.set(
+      CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+      SaslRpcServer.AuthMethod.TOKEN.toString());
+    
     YarnRPC rpc = YarnRPC.create(conf);
 
     InetSocketAddress initialAddress = conf.getSocketAddr(
@@ -236,8 +247,8 @@ public class ContainerManagerImpl extend
         YarnConfiguration.DEFAULT_NM_PORT);
 
     server =
-        rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, conf,
-            this.context.getNMTokenSecretManager(),
+        rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, 
+            serverConf, this.context.getNMTokenSecretManager(),
             conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 
                 YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
     
@@ -247,7 +258,7 @@ public class ContainerManagerImpl extend
         false)) {
       refreshServiceAcls(conf, new NMPolicyProvider());
     }
-
+    
     LOG.info("Blocking new container-requests as container manager rpc" +
     		" server is still starting.");
     this.setBlockNewContainerRequests(true);
@@ -312,18 +323,25 @@ public class ContainerManagerImpl extend
     return resultId;
   }
 
+  protected void authorizeUser(UserGroupInformation remoteUgi,
+      NMTokenIdentifier nmTokenIdentifier) throws YarnException {
+    if (!remoteUgi.getUserName().equals(
+      nmTokenIdentifier.getApplicationAttemptId().toString())) {
+      throw RPCUtil.getRemoteException("Expected applicationAttemptId: "
+          + remoteUgi.getUserName() + "Found: "
+          + nmTokenIdentifier.getApplicationAttemptId());
+    }
+  }
+
   /**
    * @param containerTokenIdentifier
    *          of the container to be started
-   * @param ugi
-   *          ugi corresponding to the remote end making the api-call
    * @throws YarnException
    */
   @Private
   @VisibleForTesting
   protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
-      ContainerTokenIdentifier containerTokenIdentifier,
-      UserGroupInformation ugi) throws YarnException {
+      ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
 
     ContainerId containerId = containerTokenIdentifier.getContainerID();
     String containerIDStr = containerId.toString();
@@ -338,14 +356,8 @@ public class ContainerManagerImpl extend
         .append(" was used for starting container with container token")
         .append(" issued for application attempt : ")
         .append(containerId.getApplicationAttemptId());
-    } else if (!ugi.getUserName().equals(
-        nmTokenIdentifier.getApplicationAttemptId().toString())) {
-      unauthorized = true;
-      messageBuilder.append("\nExpected applicationAttemptId: ")
-        .append(ugi.getUserName()).append(" Found: ")
-        .append(nmTokenIdentifier.getApplicationAttemptId().toString());
     } else if (!this.context.getContainerTokenSecretManager()
-        .isValidStartContainerRequest(containerId)) {
+        .isValidStartContainerRequest(containerTokenIdentifier)) {
       // Is the container being relaunched? Or RPC layer let startCall with
       // tokens generated off old-secret through?
       unauthorized = true;
@@ -359,7 +371,6 @@ public class ContainerManagerImpl extend
         .append(System.currentTimeMillis()).append(" found ")
         .append(containerTokenIdentifier.getExpiryTimeStamp());
     }
-
     if (unauthorized) {
       String msg = messageBuilder.toString();
       LOG.error(msg);
@@ -368,18 +379,53 @@ public class ContainerManagerImpl extend
   }
 
   /**
-   * Start a container on this NodeManager.
+   * Start a list of containers on this NodeManager.
    */
-  @SuppressWarnings("unchecked")
   @Override
-  public StartContainerResponse startContainer(StartContainerRequest request)
-      throws YarnException, IOException {
-
+  public StartContainersResponse
+      startContainers(StartContainersRequest requests) throws YarnException,
+          IOException {
     if (blockNewContainerRequests.get()) {
       throw new NMNotYetReadyException(
         "Rejecting new containers as NodeManager has not"
             + " yet connected with ResourceManager");
     }
+    UserGroupInformation remoteUgi = getRemoteUgi();
+    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
+    authorizeUser(remoteUgi,nmTokenIdentifier);
+    List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
+    Map<ContainerId, SerializedException> failedContainers =
+        new HashMap<ContainerId, SerializedException>();
+    for (StartContainerRequest request : requests.getStartContainerRequests()) {
+      ContainerId containerId = null;
+      try {
+        ContainerTokenIdentifier containerTokenIdentifier =
+            BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
+        verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
+          containerTokenIdentifier);
+        containerId = containerTokenIdentifier.getContainerID();
+        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
+          request);
+        succeededContainers.add(containerId);
+      } catch (YarnException e) {
+        failedContainers.put(containerId, SerializedException.newInstance(e));
+      } catch (InvalidToken ie) {
+        failedContainers.put(containerId, SerializedException.newInstance(ie));
+        throw ie;
+      } catch (IOException e) {
+        throw RPCUtil.getRemoteException(e);
+      }
+    }
+
+    return StartContainersResponse.newInstance(auxiliaryServices.getMetaData(),
+      succeededContainers, failedContainers);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      StartContainerRequest request) throws YarnException, IOException {
+
     /*
      * 1) It should save the NMToken into NMTokenSecretManager. This is done
      * here instead of RPC layer because at the time of opening/authenticating
@@ -391,18 +437,8 @@ public class ContainerManagerImpl extend
      * belongs to correct Node Manager (part of retrieve password). c) It has
      * correct RMIdentifier. d) It is not expired.
      */
-    // update NMToken
-
-    UserGroupInformation remoteUgi = getRemoteUgi();
-    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
-    
-    // Validate containerToken
-    ContainerTokenIdentifier containerTokenIdentifier =
-        verifyAndGetContainerTokenIdentifier(request.getContainerToken());
-
-    authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier,
-      remoteUgi);
-
+    authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
+ 
     if (containerTokenIdentifier.getRMIdentifer() != nodeStatusUpdater
         .getRMIdentifier()) {
         // Is the container coming from unknown RM
@@ -411,9 +447,9 @@ public class ContainerManagerImpl extend
           .append(" rejected as it is allocated by a previous RM");
         throw new InvalidContainerException(sb.toString());
     }
-    
+    // update NMToken
     updateNMTokenIdentifier(nmTokenIdentifier);
-    
+
     ContainerId containerId = containerTokenIdentifier.getContainerID();
     String containerIdStr = containerId.toString();
     String user = containerTokenIdentifier.getApplicationSubmitter();
@@ -457,26 +493,16 @@ public class ContainerManagerImpl extend
       containerTokenIdentifier);
     NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
       "ContainerManageImpl", applicationID, containerId);
-    StartContainerResponse response =
-        recordFactory.newRecordInstance(StartContainerResponse.class);
-    response.setAllServicesMetaData(auxiliaryServices.getMetaData());
     // TODO launchedContainer misplaced -> doesn't necessarily mean a container
     // launch. A finished Application will not launch containers.
     metrics.launchedContainer();
-    metrics.allocateContainer(containerTokenIdentifier.getResource());
-    return response;
+    metrics.allocateContainer(containerTokenIdentifier.getResource()); 
   }
 
   protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
-      org.apache.hadoop.yarn.api.records.Token token) throws YarnException,
+      org.apache.hadoop.yarn.api.records.Token token,
+      ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,
       InvalidToken {
-    ContainerTokenIdentifier containerTokenIdentifier = null;
-    try {
-      containerTokenIdentifier =
-          BuilderUtils.newContainerTokenIdentifier(token);
-    } catch (IOException e) {
-      throw RPCUtil.getRemoteException(e);
-    }
     byte[] password =
         context.getContainerTokenSecretManager().retrievePassword(
           containerTokenIdentifier);
@@ -524,64 +550,110 @@ public class ContainerManagerImpl extend
   }
 
   /**
-   * Stop the container running on this NodeManager.
+   * Stop a list of containers running on this NodeManager.
    */
   @Override
-  @SuppressWarnings("unchecked")
-  public StopContainerResponse stopContainer(StopContainerRequest request)
+  public StopContainersResponse stopContainers(StopContainersRequest requests)
       throws YarnException, IOException {
 
-    ContainerId containerID = request.getContainerId();
+    List<ContainerId> succeededRequests = new ArrayList<ContainerId>();
+    Map<ContainerId, SerializedException> failedRequests =
+        new HashMap<ContainerId, SerializedException>();
+    UserGroupInformation remoteUgi = getRemoteUgi();
+    NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+    for (ContainerId id : requests.getContainerIds()) {
+      try {
+        stopContainerInternal(identifier, id);
+        succeededRequests.add(id);
+      } catch (YarnException e) {
+        failedRequests.put(id, SerializedException.newInstance(e));
+      }
+    }
+    return StopContainersResponse
+      .newInstance(succeededRequests, failedRequests);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+      ContainerId containerID) throws YarnException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
-    LOG.info("Getting container-status for " + containerIDStr);
-    authorizeGetAndStopContainerRequest(containerID, container, true);
-
-    StopContainerResponse response =
-        recordFactory.newRecordInstance(StopContainerResponse.class);
-
-    dispatcher.getEventHandler().handle(
-      new ContainerKillEvent(containerID,
-        "Container killed by the ApplicationMaster."));
+    LOG.info("Stopping container with container Id: " + containerIDStr);
+    authorizeGetAndStopContainerRequest(containerID, container, true,
+      nmTokenIdentifier);
 
-    NMAuditLogger.logSuccess(container.getUser(),
-      AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
-        .getApplicationAttemptId().getApplicationId(), containerID);
-
-    // TODO: Move this code to appropriate place once kill_container is
-    // implemented.
-    nodeStatusUpdater.sendOutofBandHeartBeat();
+    if (container == null) {
+      if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+        throw RPCUtil.getRemoteException("Container " + containerIDStr
+          + " is not handled by this NodeManager");
+      }
+    } else {
+      dispatcher.getEventHandler().handle(
+        new ContainerKillEvent(containerID,
+          "Container killed by the ApplicationMaster."));
 
-    return response;
+      NMAuditLogger.logSuccess(container.getUser(),    
+        AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
+          .getApplicationAttemptId().getApplicationId(), containerID);
+
+      // TODO: Move this code to appropriate place once kill_container is
+      // implemented.
+      nodeStatusUpdater.sendOutofBandHeartBeat();
+    }
   }
 
+  /**
+   * Get a list of container statuses running on this NodeManager
+   */
   @Override
-  public GetContainerStatusResponse getContainerStatus(
-      GetContainerStatusRequest request) throws YarnException, IOException {
+  public GetContainerStatusesResponse getContainerStatuses(
+      GetContainerStatusesRequest request) throws YarnException, IOException {
 
-    ContainerId containerID = request.getContainerId();
+    List<ContainerStatus> succeededRequests = new ArrayList<ContainerStatus>();
+    Map<ContainerId, SerializedException> failedRequests =
+        new HashMap<ContainerId, SerializedException>();
+    UserGroupInformation remoteUgi = getRemoteUgi();
+    NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+    for (ContainerId id : request.getContainerIds()) {
+      try {
+        ContainerStatus status = getContainerStatusInternal(id, identifier);
+        succeededRequests.add(status);
+      } catch (YarnException e) {
+        failedRequests.put(id, SerializedException.newInstance(e));
+      }
+    }
+    return GetContainerStatusesResponse.newInstance(succeededRequests,
+      failedRequests);
+  }
+
+  private ContainerStatus getContainerStatusInternal(ContainerId containerID,
+      NMTokenIdentifier nmTokenIdentifier) throws YarnException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
 
     LOG.info("Getting container-status for " + containerIDStr);
-    authorizeGetAndStopContainerRequest(containerID, container, false);
+    authorizeGetAndStopContainerRequest(containerID, container, false,
+      nmTokenIdentifier);
 
+    if (container == null) {
+      if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+        throw RPCUtil.getRemoteException("Container " + containerIDStr
+          + " was recently stopped on node manager.");
+      } else {
+        throw RPCUtil.getRemoteException("Container " + containerIDStr
+          + " is not handled by this NodeManager");
+      }
+    }
     ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
     LOG.info("Returning " + containerStatus);
-    GetContainerStatusResponse response =
-        recordFactory.newRecordInstance(GetContainerStatusResponse.class);
-    response.setStatus(containerStatus);
-    return response;
+    return containerStatus;
   }
 
   @Private
   @VisibleForTesting
   protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
-      Container container, boolean stopRequest) throws YarnException {
-
-    UserGroupInformation remoteUgi = getRemoteUgi();
-    NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
-
+      Container container, boolean stopRequest, NMTokenIdentifier identifier)
+      throws YarnException {
     /*
      * For get/stop container status; we need to verify that 1) User (NMToken)
      * application attempt only has started container. 2) Requested containerId
@@ -603,17 +675,11 @@ public class ContainerManagerImpl extend
           container.getContainerId());
       } else {
         LOG.warn(identifier.getApplicationAttemptId()
-            + " attempted to get get status for non-application container : "
+            + " attempted to get status for non-application container : "
             + container.getContainerId().toString());
       }
-      throw RPCUtil.getRemoteException("Container " + containerId.toString()
-          + " is not started by this application attempt.");
     }
 
-    if (container == null) {
-      throw RPCUtil.getRemoteException("Container " + containerId.toString()
-          + " is not handled by this NodeManager");
-    }
   }
 
   class ContainerEventDispatcher implements EventHandler<ContainerEvent> {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Mon Aug 12 21:25:49 2013
@@ -392,9 +392,6 @@ public class ApplicationImpl implements 
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
 
-      // Inform the ContainerTokenSecretManager
-      app.context.getContainerTokenSecretManager().appFinished(app.appId);
-
       // Inform the logService
       app.dispatcher.getEventHandler().handle(
           new LogHandlerAppFinishedEvent(app.appId));

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Mon Aug 12 21:25:49 2013
@@ -103,7 +103,7 @@ public class ContainerImpl implements Co
   public ContainerImpl(Configuration conf, Dispatcher dispatcher,
       ContainerLaunchContext launchContext, Credentials creds,
       NodeManagerMetrics metrics,
-      ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
+      ContainerTokenIdentifier containerTokenIdentifier) {
     this.daemonConf = conf;
     this.dispatcher = dispatcher;
     this.launchContext = launchContext;
@@ -290,6 +290,11 @@ public class ContainerImpl implements Co
     .addTransition(ContainerState.DONE, ContainerState.DONE,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
+    // This transition may result when
+    // we notify container of failed localization if localizer thread (for
+    // that container) fails for some reason
+    .addTransition(ContainerState.DONE, ContainerState.DONE,
+        ContainerEventType.RESOURCE_FAILED)
 
     // create the topology tables
     .installTopology();
@@ -330,8 +335,11 @@ public class ContainerImpl implements Co
   public Map<Path,List<String>> getLocalizedResources() {
     this.readLock.lock();
     try {
-    assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
-    return localizedResources;
+      if (ContainerState.LOCALIZED == getContainerState()) {
+        return localizedResources;
+      } else {
+        return null;
+      }
     } finally {
       this.readLock.unlock();
     }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Mon Aug 12 21:25:49 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
@@ -119,14 +121,20 @@ public class ContainerLaunch implements 
   @SuppressWarnings("unchecked") // dispatcher not typed
   public Integer call() {
     final ContainerLaunchContext launchContext = container.getLaunchContext();
-    final Map<Path,List<String>> localResources =
-        container.getLocalizedResources();
+    Map<Path,List<String>> localResources = null;
     ContainerId containerID = container.getContainerId();
     String containerIdStr = ConverterUtils.toString(containerID);
     final List<String> command = launchContext.getCommands();
     int ret = -1;
 
     try {
+      localResources = container.getLocalizedResources();
+      if (localResources == null) {
+        RPCUtil.getRemoteException(
+            "Unable to get local resources when Container " + containerID +
+            " is at " + container.getContainerState());
+      }
+
       final String user = container.getUser();
       // /////////////////////////// Variable expansion
       // Before the container script gets written out.
@@ -308,6 +316,7 @@ public class ContainerLaunch implements 
    * the process id is available.
    * @throws IOException
    */
+  @SuppressWarnings("unchecked") // dispatcher not typed
   public void cleanupContainer() throws IOException {
     ContainerId containerId = container.getContainerId();
     String containerIdStr = ConverterUtils.toString(containerId);
@@ -355,13 +364,17 @@ public class ContainerLaunch implements 
               + " as user " + user
               + " for container " + containerIdStr
               + ", result=" + (result? "success" : "failed"));
-          new DelayedProcessKiller(user,
+          new DelayedProcessKiller(container, user,
               processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
         }
       }
     } catch (Exception e) {
-      LOG.warn("Got error when trying to cleanup container " + containerIdStr
-          + ", error=" + e.getMessage());
+      String message =
+          "Exception when trying to cleanup container " + containerIdStr
+              + ": " + StringUtils.stringifyException(e);
+      LOG.warn(message);
+      dispatcher.getEventHandler().handle(
+        new ContainerDiagnosticsUpdateEvent(containerId, message));
     } finally {
       // cleanup pid file if present
       if (pidFilePath != null) {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Mon Aug 12 21:25:49 2013
@@ -37,12 +37,17 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -61,7 +66,8 @@ public class ContainersLauncher extends 
   private final Dispatcher dispatcher;
 
   private LocalDirsHandlerService dirsHandler;
-  private final ExecutorService containerLauncher =
+  @VisibleForTesting
+  public ExecutorService containerLauncher =
     Executors.newCachedThreadPool(
         new ThreadFactoryBuilder()
           .setNameFormat("ContainersLauncher #%d")
@@ -107,6 +113,7 @@ public class ContainersLauncher extends 
     super.serviceStop();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainersLauncherEvent event) {
     // TODO: ContainersLauncher launches containers one by one!!
@@ -134,9 +141,18 @@ public class ContainersLauncher extends 
         Future<Integer> rContainer = rContainerDatum.runningcontainer;
         if (rContainer != null 
             && !rContainer.isDone()) {
-          // Cancel the future so that it won't be launched 
-          // if it isn't already.
-          rContainer.cancel(false);
+          // Cancel the future so that it won't be launched if it isn't already.
+          // If it is going to be canceled, make sure CONTAINER_KILLED_ON_REQUEST
+          // will not be missed if the container is already at KILLING
+          if (rContainer.cancel(false)) {
+            if (container.getContainerState() == ContainerState.KILLING) {
+              dispatcher.getEventHandler().handle(
+                  new ContainerExitEvent(containerId,
+                      ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+                      ExitCode.TERMINATED.getExitCode(),
+                      "Container terminated before launch."));
+            }
+          }
         }
 
         // Cleanup a container whether it is running/killed/completed, so that

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Mon Aug 12 21:25:49 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -288,11 +289,10 @@ public class ContainerLocalizer {
           stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
         } catch (ExecutionException e) {
           stat.setStatus(ResourceStatusType.FETCH_FAILURE);
-          stat.setException(
-              YarnServerBuilderUtils.newSerializedException(e.getCause()));
+          stat.setException(SerializedException.newInstance(e.getCause()));
         } catch (CancellationException e) {
           stat.setStatus(ResourceStatusType.FETCH_FAILURE);
-          stat.setException(YarnServerBuilderUtils.newSerializedException(e));
+          stat.setException(SerializedException.newInstance(e));
         }
         // TODO shouldn't remove until ACK
         i.remove();

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Mon Aug 12 21:25:49 2013
@@ -242,6 +242,7 @@ class LocalResourcesTrackerImpl implemen
         delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
       }
       decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
+      LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
       return true;
     }
   }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Mon Aug 12 21:25:49 2013
@@ -112,12 +112,17 @@ public class LocalizedResource implement
       .append(getState() == ResourceState.LOCALIZED
           ? getLocalPath() + "," + getSize()
           : "pending").append(",[");
-    for (ContainerId c : ref) {
-      sb.append("(").append(c.toString()).append(")");
+    try {
+      this.readLock.lock();
+      for (ContainerId c : ref) {
+        sb.append("(").append(c.toString()).append(")");
+      }
+      sb.append("],").append(getTimestamp()).append(",").append(getState())
+        .append("}");
+      return sb.toString();
+    } finally {
+      this.readLock.unlock();
     }
-    sb.append("],").append(getTimestamp()).append(",")
-      .append(getState()).append("}");
-    return sb.toString();
   }
 
   private void release(ContainerId container) {
@@ -188,8 +193,8 @@ public class LocalizedResource implement
         LOG.warn("Can't handle this event at current state", e);
       }
       if (oldState != newState) {
-        LOG.info("Resource " + resourcePath + " transitioned from "
-            + oldState
+        LOG.info("Resource " + resourcePath + (localPath != null ? 
+          "(->" + localPath + ")": "") + " transitioned from " + oldState
             + " to " + newState);
       }
     } finally {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Mon Aug 12 21:25:49 2013
@@ -29,6 +29,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@@ -629,23 +631,16 @@ public class ResourceLocalizationService
     final Configuration conf;
     final ExecutorService threadPool;
     final CompletionService<Path> queue;
+    // Its shared between public localizer and dispatcher thread.
     final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
 
     PublicLocalizer(Configuration conf) {
-      this(conf, getLocalFileContext(conf),
-           createLocalizerExecutor(conf),
-           new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
-    }
-    
-    PublicLocalizer(Configuration conf, FileContext lfs,
-        ExecutorService threadPool,
-        Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
       super("Public Localizer");
-      this.lfs = lfs;
+      this.lfs = getLocalFileContext(conf);
       this.conf = conf;
-      this.pending = pending;
-
-      this.threadPool = threadPool;
+      this.pending =
+          new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
+      this.threadPool = createLocalizerExecutor(conf);
       this.queue = new ExecutorCompletionService<Path>(threadPool);
     }
 
@@ -747,6 +742,7 @@ public class ResourceLocalizationService
     final LocalizerContext context;
     final String localizerId;
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
+    // Its a shared list between Private Localizer and dispatcher thread.
     final List<LocalizerResourceRequestEvent> pending;
 
     // TODO: threadsafe, use outer?
@@ -757,13 +753,14 @@ public class ResourceLocalizationService
       super("LocalizerRunner for " + localizerId);
       this.context = context;
       this.localizerId = localizerId;
-      this.pending = new ArrayList<LocalizerResourceRequestEvent>();
+      this.pending =
+          Collections
+            .synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
       this.scheduled =
           new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
     }
 
     public void addResource(LocalizerResourceRequestEvent request) {
-      // TDOO: Synchronization
       pending.add(request);
     }
 
@@ -773,43 +770,44 @@ public class ResourceLocalizationService
      * @return
      */
     private LocalResource findNextResource() {
-      // TODO: Synchronization
-      for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
-           i.hasNext();) {
-        LocalizerResourceRequestEvent evt = i.next();
-        LocalizedResource nRsrc = evt.getResource();
-        // Resource download should take place ONLY if resource is in
-        // Downloading state
-        if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
-          i.remove();
-          continue;
-        }
-        /*
-         * Multiple containers will try to download the same resource. So the
-         * resource download should start only if
-         * 1) We can acquire a non blocking semaphore lock on resource
-         * 2) Resource is still in DOWNLOADING state
-         */
-        if (nRsrc.tryAcquire()) {
-          if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
-            LocalResourceRequest nextRsrc = nRsrc.getRequest();
-            LocalResource next =
-                recordFactory.newRecordInstance(LocalResource.class);
-            next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
-              .getPath()));
-            next.setTimestamp(nextRsrc.getTimestamp());
-            next.setType(nextRsrc.getType());
-            next.setVisibility(evt.getVisibility());
-            next.setPattern(evt.getPattern());
-            scheduled.put(nextRsrc, evt);
-            return next;
-          } else {
-            // Need to release acquired lock
-            nRsrc.unlock();
-          }
-        }
+      synchronized (pending) {
+        for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
+            i.hasNext();) {
+         LocalizerResourceRequestEvent evt = i.next();
+         LocalizedResource nRsrc = evt.getResource();
+         // Resource download should take place ONLY if resource is in
+         // Downloading state
+         if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
+           i.remove();
+           continue;
+         }
+         /*
+          * Multiple containers will try to download the same resource. So the
+          * resource download should start only if
+          * 1) We can acquire a non blocking semaphore lock on resource
+          * 2) Resource is still in DOWNLOADING state
+          */
+         if (nRsrc.tryAcquire()) {
+           if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+             LocalResourceRequest nextRsrc = nRsrc.getRequest();
+             LocalResource next =
+                 recordFactory.newRecordInstance(LocalResource.class);
+             next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
+               .getPath()));
+             next.setTimestamp(nextRsrc.getTimestamp());
+             next.setType(nextRsrc.getType());
+             next.setVisibility(evt.getVisibility());
+             next.setPattern(evt.getPattern());
+             scheduled.put(nextRsrc, evt);
+             return next;
+           } else {
+             // Need to release acquired lock
+             nRsrc.unlock();
+           }
+         }
+       }
+       return null;
       }
-      return null;
     }
 
     LocalizerHeartbeatResponse update(
@@ -1094,7 +1092,8 @@ public class ResourceLocalizationService
         try {
           if (status.getPath().getName().matches(".*" +
               ContainerLocalizer.USERCACHE + "_DEL_.*")) {
-            cleanUpFilesFromSubDir(lfs, del, status.getPath());
+            LOG.info("usercache path : " + status.getPath().toString());
+            cleanUpFilesPerUserDir(lfs, del, status.getPath());
           } else if (status.getPath().getName()
               .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
               ||
@@ -1111,17 +1110,28 @@ public class ResourceLocalizationService
     }
   }
 
-  private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
-      Path dirPath) throws IOException {
-    RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
-    if (fileStatus != null) {
-      while (fileStatus.hasNext()) {
-        FileStatus status = fileStatus.next();
+  private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
+      Path userDirPath) throws IOException {
+    RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
+    FileDeletionTask dependentDeletionTask =
+        del.createFileDeletionTask(null, userDirPath, new Path[] {});
+    if (userDirStatus != null) {
+      List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+      while (userDirStatus.hasNext()) {
+        FileStatus status = userDirStatus.next();
         String owner = status.getOwner();
-        del.delete(owner, status.getPath(), new Path[] {});
+        FileDeletionTask deletionTask =
+            del.createFileDeletionTask(owner, null,
+              new Path[] { status.getPath() });
+        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        deletionTasks.add(deletionTask);
+      }
+      for (FileDeletionTask task : deletionTasks) {
+        del.scheduleFileDeletionTask(task);
       }
+    } else {
+      del.scheduleFileDeletionTask(dependentDeletionTask);
     }
-    del.delete(null, dirPath, new Path[] {});
   }
 
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java Mon Aug 12 21:25:49 2013
@@ -18,17 +18,17 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.security;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -48,14 +48,15 @@ public class NMContainerTokenSecretManag
       .getLog(NMContainerTokenSecretManager.class);
   
   private MasterKeyData previousMasterKey;
+  private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
+
   
-  private final Map<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>> oldMasterKeys;
   private String nodeHostAddr;
   
   public NMContainerTokenSecretManager(Configuration conf) {
     super(conf);
-    this.oldMasterKeys =
-        new HashMap<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>>();
+    recentlyStartedContainerTracker =
+        new TreeMap<Long, List<ContainerId>>();
   }
 
   /**
@@ -93,9 +94,6 @@ public class NMContainerTokenSecretManag
   public synchronized byte[] retrievePassword(
       ContainerTokenIdentifier identifier) throws SecretManager.InvalidToken {
     int keyId = identifier.getMasterKeyId();
-    ContainerId containerId = identifier.getContainerID();
-    ApplicationId appId =
-        containerId.getApplicationAttemptId().getApplicationId();
 
     MasterKeyData masterKeyToUse = null;
     if (this.previousMasterKey != null
@@ -107,19 +105,6 @@ public class NMContainerTokenSecretManag
       // A container-launch has come in with a token generated off the current
       // master-key
       masterKeyToUse = super.currentMasterKey;
-    } else if (this.oldMasterKeys.containsKey(appId)
-        && this.oldMasterKeys.get(appId).containsKey(containerId)) {
-      // This means on the following happened:
-      // (1) a stopContainer() or a getStatus() happened for a container with
-      // token generated off a master-key that is neither current nor the
-      // previous one.
-      // (2) a container-relaunch has come in with a token generated off a
-      // master-key that is neither current nor the previous one.
-      // This basically lets stop and getStatus() calls with old-tokens to pass
-      // through without any issue, i.e. (1).
-      // Start-calls for repetitive launches (2) also pass through RPC here, but
-      // get thwarted at the app-layer as part of startContainer() call.
-      masterKeyToUse = this.oldMasterKeys.get(appId).get(containerId);
     }
 
     if (nodeHostAddr != null
@@ -143,61 +128,64 @@ public class NMContainerTokenSecretManag
   }
 
   /**
-   * Container start has gone through. Store the corresponding keys so that
-   * stopContainer() and getContainerStatus() can be authenticated long after
-   * the container-start went through.
+   * Container start has gone through. We need to store the containerId in order
+   * to block future container start requests with same container token. This
+   * container token needs to be saved till its container token expires.
    */
   public synchronized void startContainerSuccessful(
       ContainerTokenIdentifier tokenId) {
-    int keyId = tokenId.getMasterKeyId();
-    if (currentMasterKey.getMasterKey().getKeyId() == keyId) {
-      addKeyForContainerId(tokenId.getContainerID(), currentMasterKey);
-    } else if (previousMasterKey != null
-        && previousMasterKey.getMasterKey().getKeyId() == keyId) {
-      addKeyForContainerId(tokenId.getContainerID(), previousMasterKey);
+
+    removeAnyContainerTokenIfExpired();
+    
+    Long expTime = tokenId.getExpiryTimeStamp();
+    // We might have multiple containers with same expiration time.
+    if (!recentlyStartedContainerTracker.containsKey(expTime)) {
+      recentlyStartedContainerTracker
+        .put(expTime, new ArrayList<ContainerId>());
+    }
+    recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
+
+  }
+
+  protected synchronized void removeAnyContainerTokenIfExpired() {
+    // Trying to remove any container if its container token has expired.
+    Iterator<Entry<Long, List<ContainerId>>> containersI =
+        this.recentlyStartedContainerTracker.entrySet().iterator();
+    Long currTime = System.currentTimeMillis();
+    while (containersI.hasNext()) {
+      Entry<Long, List<ContainerId>> containerEntry = containersI.next();
+      if (containerEntry.getKey() < currTime) {
+        containersI.remove();
+      } else {
+        break;
+      }
     }
   }
 
   /**
-   * Ensure the startContainer call is not using an older cached key. Will
-   * return false once startContainerSuccessful is called. Does not check
-   * the actual key being current since that is verified by the security layer
-   * via retrievePassword.
+   * Container will be remembered based on expiration time of the container
+   * token used for starting the container. It is safe to use expiration time
+   * as there is one to many mapping between expiration time and containerId.
+   * @return true if the current token identifier is not present in cache.
    */
   public synchronized boolean isValidStartContainerRequest(
-      ContainerId containerID) {
-    ApplicationId applicationId =
-        containerID.getApplicationAttemptId().getApplicationId();
-    return !this.oldMasterKeys.containsKey(applicationId)
-        || !this.oldMasterKeys.get(applicationId).containsKey(containerID);
-  }
+      ContainerTokenIdentifier containerTokenIdentifier) {
 
-  private synchronized void addKeyForContainerId(ContainerId containerId,
-      MasterKeyData masterKeyData) {
-    if (containerId != null) {
-      ApplicationId appId =
-          containerId.getApplicationAttemptId().getApplicationId();
-      if (!this.oldMasterKeys.containsKey(appId)) {
-        this.oldMasterKeys.put(appId,
-          new ConcurrentHashMap<ContainerId, MasterKeyData>());
-      }
-      ConcurrentMap<ContainerId, MasterKeyData> containerIdToKeysMapForThisApp =
-          this.oldMasterKeys.get(appId);
-      containerIdToKeysMapForThisApp.put(containerId, masterKeyData);
+    removeAnyContainerTokenIfExpired();
+
+    Long expTime = containerTokenIdentifier.getExpiryTimeStamp();
+    List<ContainerId> containers =
+        this.recentlyStartedContainerTracker.get(expTime);
+    if (containers == null
+        || !containers.contains(containerTokenIdentifier.getContainerID())) {
+      return true;
     } else {
-      LOG.warn("Not adding key for null containerId");
+      return false;
     }
   }
 
-  // Holding on to master-keys corresponding to containers until the app is
-  // finished due to the multiple ways a container can finish. Avoid
-  // stopContainer calls seeing unnecessary authorization exceptions.
-  public synchronized void appFinished(ApplicationId appId) {
-    this.oldMasterKeys.remove(appId);
-  }
-  
   public synchronized void setNodeId(NodeId nodeId) {
     nodeHostAddr = nodeId.toString();
     LOG.info("Updating node address : " + nodeHostAddr);
-  } 
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Mon Aug 12 21:25:49 2013
@@ -195,14 +195,13 @@ public class DummyContainerManager exten
   
   @Override
   protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
-      ContainerTokenIdentifier containerTokenIdentifier,
-      UserGroupInformation ugi) throws YarnException {
+      ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
     // do nothing
   }
   
   @Override
   protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
-      Container container, boolean stopRequest) throws YarnException {
+      Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
     // do nothing
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java Mon Aug 12 21:25:49 2013
@@ -61,6 +61,10 @@ public class MockNodeStatusUpdater exten
   protected ResourceTracker getRMClient() {
     return resourceTracker;
   }
+  @Override
+  protected void stopRMProxy() {
+    return;
+  }
   
   private static class MockResourceTracker implements ResourceTracker {
     private int heartBeatID;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java Mon Aug 12 21:25:49 2013
@@ -146,7 +146,7 @@ public class TestContainerManagerWithLCE
 
   @Override
   public void testContainerLaunchFromPreviousRM() throws InterruptedException,
-      IOException {
+      IOException, YarnException {
     // Don't run the test if the binary is not available.
     if (!shouldRunTest()) {
       LOG.info("LCE binary path is not passed. Not running the test");
@@ -155,6 +155,29 @@ public class TestContainerManagerWithLCE
     LOG.info("Running testContainerLaunchFromPreviousRM");
     super.testContainerLaunchFromPreviousRM();
   }
+
+  @Override
+  public void testMultipleContainersLaunch() throws Exception {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerLaunchFromPreviousRM");
+    super.testMultipleContainersLaunch();
+  }
+
+  @Override
+  public void testMultipleContainersStopAndGetStatus() throws Exception {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerLaunchFromPreviousRM");
+    super.testMultipleContainersStopAndGetStatus();
+  }
+
   private boolean shouldRunTest() {
     return System
         .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Mon Aug 12 21:25:49 2013
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,16 +33,11 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
-
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.*;
-
 public class TestDeletionService {
 
   private static final FileContext lfs = getLfs();
@@ -210,4 +210,79 @@ public class TestDeletionService {
     }
     assertTrue(del.isTerminated());
   }
+  
+  @Test (timeout=60000)
+  public void testFileDeletionTaskDependency() throws Exception {
+    FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
+    Configuration conf = new Configuration();
+    exec.setConf(conf);
+    DeletionService del = new DeletionService(exec);
+    del.init(conf);
+    del.start();
+    
+    try {
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      System.out.println("SEED: " + seed);
+      List<Path> dirs = buildDirs(r, base, 2);
+      createDirs(new Path("."), dirs);
+      
+      // first we will try to delete sub directories which are present. This
+      // should then trigger parent directory to be deleted.
+      List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
+      
+      FileDeletionTask dependentDeletionTask =
+          del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
+      List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
+      for (Path subDir : subDirs) {
+        FileDeletionTask deletionTask =
+            del.createFileDeletionTask(null, null, new Path[] { subDir });
+        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        deletionTasks.add(deletionTask);
+      }
+      for (FileDeletionTask task : deletionTasks) {
+        del.scheduleFileDeletionTask(task);
+      }
+
+      int msecToWait = 20 * 1000;
+      while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) {
+        Thread.sleep(100);
+        msecToWait -= 100;
+      }
+      assertFalse(lfs.util().exists(dirs.get(0)));
+      
+
+      // Now we will try to delete sub directories; one of the deletion task we
+      // will mark as failure and then parent directory should not be deleted.
+      subDirs = buildDirs(r, dirs.get(1), 2);
+      subDirs.add(new Path(dirs.get(1), "absentFile"));
+      
+      dependentDeletionTask =
+          del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
+      deletionTasks = new ArrayList<FileDeletionTask>();
+      for (Path subDir : subDirs) {
+        FileDeletionTask deletionTask =
+            del.createFileDeletionTask(null, null, new Path[] { subDir });
+        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        deletionTasks.add(deletionTask);
+      }
+      // marking one of the tasks as a failure.
+      deletionTasks.get(2).setSuccess(false);
+      for (FileDeletionTask task : deletionTasks) {
+        del.scheduleFileDeletionTask(task);
+      }
+
+      msecToWait = 20 * 1000;
+      while (msecToWait > 0
+          && (lfs.util().exists(subDirs.get(0)) || lfs.util().exists(
+            subDirs.get(1)))) {
+        Thread.sleep(100);
+        msecToWait -= 100;
+      }
+      assertTrue(lfs.util().exists(dirs.get(1)));
+    } finally {
+      del.stop();
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Mon Aug 12 21:25:49 2013
@@ -20,18 +20,19 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -46,7 +47,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Test;
 
 
@@ -62,7 +62,7 @@ public class TestEventFlow {
   private static File remoteLogDir = new File("target",
       TestEventFlow.class.getName() + "-remoteLogDir").getAbsoluteFile();
   private static final long SIMULATED_RM_IDENTIFIER = 1234;
-
+  
   @Test
   public void testSuccessfulContainerLaunch() throws InterruptedException,
       IOException, YarnException {
@@ -107,6 +107,11 @@ public class TestEventFlow {
         return new LocalRMInterface();
       };
 
+          @Override
+          protected void stopRMProxy() {
+            return;
+          }
+
       @Override
       protected void startStatusUpdater() {
         return; // Don't start any updating thread.
@@ -135,21 +140,25 @@ public class TestEventFlow {
     ContainerId cID = ContainerId.newInstance(applicationAttemptId, 0);
 
     String user = "testing";
-    StartContainerRequest request = 
-        recordFactory.newRecordInstance(StartContainerRequest.class);
-    request.setContainerLaunchContext(launchContext);
-    request.setContainerToken(TestContainerManager.createContainerToken(cID,
-      SIMULATED_RM_IDENTIFIER, context.getNodeId(), user,
-      context.getContainerTokenSecretManager()));
-    containerManager.startContainer(request);
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(launchContext,
+          TestContainerManager.createContainerToken(cID,
+            SIMULATED_RM_IDENTIFIER, context.getNodeId(), user,
+            context.getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
 
     BaseContainerManagerTest.waitForContainerState(containerManager, cID,
         ContainerState.RUNNING);
 
-    StopContainerRequest stopRequest = 
-        recordFactory.newRecordInstance(StopContainerRequest.class);
-    stopRequest.setContainerId(cID);
-    containerManager.stopContainer(stopRequest);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cID);
+    StopContainersRequest stopRequest =
+        StopContainersRequest.newInstance(containerIds);
+    containerManager.stopContainers(stopRequest);
     BaseContainerManagerTest.waitForContainerState(containerManager, cID,
         ContainerState.COMPLETE);
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNMAuditLogger.java Mon Aug 12 21:25:49 2013
@@ -204,8 +204,10 @@ public class TestNMAuditLogger {
   public void testNMAuditLoggerWithIP() throws Exception {
     Configuration conf = new Configuration();
     // start the IPC server
-    Server server = RPC.getServer(TestProtocol.class,
-        new MyTestRPCServer(), "0.0.0.0", 0, 5, true, conf, null);
+    Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+        .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
+        .setPort(0).setNumHandlers(5).setVerbose(true).build();
+
     server.start();
 
     InetSocketAddress addr = NetUtils.getConnectAddress(server);

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java Mon Aug 12 21:25:49 2013
@@ -39,8 +39,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -69,8 +71,8 @@ import org.mockito.ArgumentMatcher;
 
 public class TestNodeManagerReboot {
 
-  static final File basedir =
-      new File("target", TestNodeManagerReboot.class.getName());
+  static final File basedir = new File("target",
+    TestNodeManagerReboot.class.getName());
   static final File logsDir = new File(basedir, "logs");
   static final File nmLocalDir = new File(basedir, "nm0");
   static final File localResourceDir = new File(basedir, "resource");
@@ -100,7 +102,8 @@ public class TestNodeManagerReboot {
     nm = new MyNodeManager();
     nm.start();
 
-    final ContainerManagementProtocol containerManager = nm.getContainerManager();
+    final ContainerManagementProtocol containerManager =
+        nm.getContainerManager();
 
     // create files under fileCache
     createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100);
@@ -112,16 +115,13 @@ public class TestNodeManagerReboot {
     ContainerId cId = createContainerId();
 
     URL localResourceUri =
-        ConverterUtils.getYarnUrlFromPath(localFS
-            .makeQualified(new Path(localResourceDir.getAbsolutePath())));
+        ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(
+          localResourceDir.getAbsolutePath())));
 
     LocalResource localResource =
-        Records.newRecord(LocalResource.class);
-    localResource.setResource(localResourceUri);
-    localResource.setSize(-1);
-    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-    localResource.setType(LocalResourceType.FILE);
-    localResource.setTimestamp(localResourceDir.lastModified());
+        LocalResource.newInstance(localResourceUri, LocalResourceType.FILE,
+          LocalResourceVisibility.APPLICATION, -1,
+          localResourceDir.lastModified());
     String destinationFile = "dest_file";
     Map<String, LocalResource> localResources =
         new HashMap<String, LocalResource>();
@@ -129,32 +129,38 @@ public class TestNodeManagerReboot {
     containerLaunchContext.setLocalResources(localResources);
     List<String> commands = new ArrayList<String>();
     containerLaunchContext.setCommands(commands);
-    
-    final StartContainerRequest startRequest =
-        Records.newRecord(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
+
     NodeId nodeId = nm.getNMContext().getNodeId();
-    startRequest.setContainerToken(TestContainerManager.createContainerToken(
-      cId, 0, nodeId, destinationFile, nm.getNMContext()
-        .getContainerTokenSecretManager()));
-    final UserGroupInformation currentUser = UserGroupInformation
-        .createRemoteUser(cId.getApplicationAttemptId().toString());
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          TestContainerManager.createContainerToken(
+            cId, 0, nodeId, destinationFile, nm.getNMContext()
+              .getContainerTokenSecretManager()));
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    final StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+
+    final UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId()
+          .toString());
     NMTokenIdentifier nmIdentifier =
         new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123);
     currentUser.addTokenIdentifier(nmIdentifier);
     currentUser.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws YarnException, IOException {
-        nm.getContainerManager().startContainer(startRequest);
+        nm.getContainerManager().startContainers(allRequests);
         return null;
       }
     });
 
-    GetContainerStatusRequest request =
-        Records.newRecord(GetContainerStatusRequest.class);
-    request.setContainerId(cId);
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(cId);
+    GetContainerStatusesRequest request =
+        GetContainerStatusesRequest.newInstance(containerIds);
     Container container =
-        nm.getNMContext().getContainers().get(request.getContainerId());
+        nm.getNMContext().getContainers().get(request.getContainerIds().get(0));
 
     final int MAX_TRIES = 20;
     int numTries = 0;
@@ -170,27 +176,31 @@ public class TestNodeManagerReboot {
 
     Assert.assertEquals(ContainerState.DONE, container.getContainerState());
 
-    Assert.assertTrue(
-        "The container should create a subDir named currentUser: " + user +
-            "under localDir/usercache",
+    Assert
+      .assertTrue(
+        "The container should create a subDir named currentUser: " + user
+            + "under localDir/usercache",
         numOfLocalDirs(nmLocalDir.getAbsolutePath(),
-            ContainerLocalizer.USERCACHE) > 0);
+          ContainerLocalizer.USERCACHE) > 0);
 
-    Assert.assertTrue("There should be files or Dirs under nm_private when " +
-        "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+    Assert.assertTrue(
+      "There should be files or Dirs under nm_private when "
+          + "container is launched",
+      numOfLocalDirs(nmLocalDir.getAbsolutePath(),
         ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
 
     // restart the NodeManager
     nm.stop();
     nm = new MyNodeManager();
-    nm.start();    
+    nm.start();
 
     numTries = 0;
-    while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
-        .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
-        ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
-        .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
-        > 0) && numTries < MAX_TRIES) {
+    while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+      ContainerLocalizer.USERCACHE) > 0
+        || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+          ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
+      nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
+        && numTries < MAX_TRIES) {
       try {
         Thread.sleep(500);
       } catch (InterruptedException ex) {
@@ -199,21 +209,27 @@ public class TestNodeManagerReboot {
       numTries++;
     }
 
-    Assert.assertTrue("After NM reboots, all local files should be deleted",
-        numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
-            .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
-            ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
-            .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
-              == 0);
+    Assert
+      .assertTrue(
+        "After NM reboots, all local files should be deleted",
+        numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+          ContainerLocalizer.USERCACHE) == 0
+            && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+              ContainerLocalizer.FILECACHE) == 0
+            && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+              ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
     verify(delService, times(1)).delete(
-        (String) isNull(),
-        argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
-            + "_DEL_")));
+      (String) isNull(),
+      argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
+          + "_DEL_")));
     verify(delService, times(1)).delete((String) isNull(),
-        argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
-    verify(delService, times(1)).delete((String) isNull(),
-        argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
-
+      argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
+    verify(delService, times(1)).scheduleFileDeletionTask(
+      argThat(new FileDeletionInclude(user, null,
+        new String[] { destinationFile })));
+    verify(delService, times(1)).scheduleFileDeletionTask(
+      argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
+          + "_DEL_", new String[] {})));
   }
 
   private int numOfLocalDirs(String localDir, String localSubDir) {
@@ -238,7 +254,8 @@ public class TestNodeManagerReboot {
 
   private ContainerId createContainerId() {
     ApplicationId appId = ApplicationId.newInstance(0, 0);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
     ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
     return containerId;
   }
@@ -253,8 +270,8 @@ public class TestNodeManagerReboot {
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-      MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
-          context, dispatcher, healthChecker, metrics);
+      MockNodeStatusUpdater myNodeStatusUpdater =
+          new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics);
       return myNodeStatusUpdater;
     }
 
@@ -288,4 +305,58 @@ public class TestNodeManagerReboot {
       return ((Path) o).getName().indexOf(part) != -1;
     }
   }
+  
+  class FileDeletionInclude extends ArgumentMatcher<FileDeletionTask> {
+    final String user;
+    final String subDirIncludes;
+    final String[] baseDirIncludes;
+    
+    public FileDeletionInclude(String user, String subDirIncludes,
+        String [] baseDirIncludes) {
+      this.user = user;
+      this.subDirIncludes = subDirIncludes;
+      this.baseDirIncludes = baseDirIncludes;
+    }
+    
+    @Override
+    public boolean matches(Object o) {
+      FileDeletionTask fd = (FileDeletionTask)o;
+      if (fd.getUser() == null && user != null) {
+        return false;
+      } else if (fd.getUser() != null && user == null) {
+        return false;
+      } else if (fd.getUser() != null && user != null) {
+        return fd.getUser().equals(user);
+      }
+      if (!comparePaths(fd.getSubDir(), subDirIncludes)) {
+        return false;
+      }
+      if (baseDirIncludes == null && fd.getBaseDirs() != null) {
+        return false;
+      } else if (baseDirIncludes != null && fd.getBaseDirs() == null ) {
+        return false;
+      } else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
+        if (baseDirIncludes.length != fd.getBaseDirs().size()) {
+          return false;
+        }
+        for (int i =0 ; i < baseDirIncludes.length; i++) {
+          if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+    
+    public boolean comparePaths(Path p1, String p2) {
+      if (p1 == null && p2 != null){
+        return false;
+      } else if (p1 != null && p2 == null) {
+        return false;
+      } else if (p1 != null && p2 != null ){
+        return p1.toUri().getPath().contains(p2.toString());
+      }
+      return true;
+    }
+  }
 }