You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2017/06/06 08:28:16 UTC

[26/50] [abbrv] hadoop git commit: YARN-6366. Refactor the NodeManager DeletionService to support additional DeletionTask types. Contributed by Shane Kumpf.

YARN-6366. Refactor the NodeManager DeletionService to support additional DeletionTask types. Contributed by Shane Kumpf.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/547f18cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/547f18cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/547f18cb

Branch: refs/heads/HADOOP-13345
Commit: 547f18cb96aeda55cc19b38be2be4d631b3a5f4f
Parents: 4b4a652
Author: Varun Vasudev <vv...@apache.org>
Authored: Wed May 31 16:15:35 2017 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Wed May 31 16:15:35 2017 +0530

----------------------------------------------------------------------
 .../server/nodemanager/DeletionService.java     | 468 ++++---------------
 .../nodemanager/api/impl/pb/NMProtoUtils.java   | 110 +++++
 .../nodemanager/api/impl/pb/package-info.java   |  25 +
 .../recovery/DeletionTaskRecoveryInfo.java      |  73 +++
 .../deletion/recovery/package-info.java         |  25 +
 .../deletion/task/DeletionTask.java             | 258 ++++++++++
 .../deletion/task/DeletionTaskType.java         |  24 +
 .../deletion/task/FileDeletionTask.java         | 202 ++++++++
 .../deletion/task/package-info.java             |  25 +
 .../localizer/LocalResourcesTrackerImpl.java    |  13 +-
 .../localizer/ResourceLocalizationService.java  |  40 +-
 .../logaggregation/AppLogAggregatorImpl.java    |  60 ++-
 .../loghandler/NonAggregatingLogHandler.java    |   7 +-
 .../yarn_server_nodemanager_recovery.proto      |   1 +
 .../server/nodemanager/TestDeletionService.java |  57 ++-
 .../nodemanager/TestNodeManagerReboot.java      |  99 +---
 .../api/impl/pb/TestNMProtoUtils.java           |  91 ++++
 .../BaseContainerManagerTest.java               |   7 +-
 .../deletion/task/FileDeletionMatcher.java      |  84 ++++
 .../deletion/task/TestFileDeletionTask.java     |  85 ++++
 .../TestLocalResourcesTrackerImpl.java          |   5 +-
 .../TestResourceLocalizationService.java        |  33 +-
 .../TestAppLogAggregatorImpl.java               |  15 +-
 .../TestLogAggregationService.java              |  17 +-
 .../TestNonAggregatingLogHandler.java           |   8 +-
 25 files changed, 1274 insertions(+), 558 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
index aac0af9..38d69a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,461 +35,176 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
-import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.NMProtoUtils;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class DeletionService extends AbstractService {
-  static final Log LOG = LogFactory.getLog(DeletionService.class);
+
+  private static final Log LOG = LogFactory.getLog(DeletionService.class);
+
   private int debugDelay;
-  private final ContainerExecutor exec;
-  private ScheduledThreadPoolExecutor sched;
-  private static final FileContext lfs = getLfs();
+  private final ContainerExecutor containerExecutor;
   private final NMStateStoreService stateStore;
+  private ScheduledThreadPoolExecutor sched;
   private AtomicInteger nextTaskId = new AtomicInteger(0);
 
-  static final FileContext getLfs() {
-    try {
-      return FileContext.getLocalFSFileContext();
-    } catch (UnsupportedFileSystemException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   public DeletionService(ContainerExecutor exec) {
     this(exec, new NMNullStateStoreService());
   }
 
-  public DeletionService(ContainerExecutor exec,
+  public DeletionService(ContainerExecutor containerExecutor,
       NMStateStoreService stateStore) {
     super(DeletionService.class.getName());
-    this.exec = exec;
+    this.containerExecutor = containerExecutor;
     this.debugDelay = 0;
     this.stateStore = stateStore;
   }
-  
-  /**
-   * Delete the path(s) as this user.
-   * @param user The user to delete as, or the JVM user if null
-   * @param subDir the sub directory name
-   * @param baseDirs the base directories which contains the subDir's
-   */
-  public void delete(String user, Path subDir, Path... baseDirs) {
-    // TODO if parent owned by NM, rename within parent inline
-    if (debugDelay != -1) {
-      List<Path> baseDirList = null;
-      if (baseDirs != null && baseDirs.length != 0) {
-        baseDirList = Arrays.asList(baseDirs);
-      }
-      FileDeletionTask task =
-          new FileDeletionTask(this, user, subDir, baseDirList);
-      recordDeletionTaskInStateStore(task);
-      sched.schedule(task, debugDelay, TimeUnit.SECONDS);
-    }
-  }
-  
-  public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
-    if (debugDelay != -1) {
-      recordDeletionTaskInStateStore(fileDeletionTask);
-      sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
-    }
-  }
-  
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("DeletionService #%d")
-      .build();
-    if (conf != null) {
-      sched = new HadoopScheduledThreadPoolExecutor(
-          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
-          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
-      debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
-    } else {
-      sched = new HadoopScheduledThreadPoolExecutor(
-          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
-    }
-    sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-    sched.setKeepAliveTime(60L, SECONDS);
-    if (stateStore.canRecover()) {
-      recover(stateStore.loadDeletionServiceState());
-    }
-    super.serviceInit(conf);
-  }
 
-  @Override
-  protected void serviceStop() throws Exception {
-    if (sched != null) {
-      sched.shutdown();
-      boolean terminated = false;
-      try {
-        terminated = sched.awaitTermination(10, SECONDS);
-      } catch (InterruptedException e) {
-      }
-      if (terminated != true) {
-        sched.shutdownNow();
-      }
-    }
-    super.serviceStop();
+  public int getDebugDelay() {
+    return debugDelay;
   }
 
-  /**
-   * Determine if the service has completely stopped.
-   * Used only by unit tests
-   * @return true if service has completely stopped
-   */
-  @Private
-  public boolean isTerminated() {
-    return getServiceState() == STATE.STOPPED && sched.isTerminated();
+  public ContainerExecutor getContainerExecutor() {
+    return containerExecutor;
   }
 
-  public static class FileDeletionTask implements Runnable {
-    public static final int INVALID_TASK_ID = -1;
-    private int taskId;
-    private final String user;
-    private final Path subDir;
-    private final List<Path> baseDirs;
-    private final AtomicInteger numberOfPendingPredecessorTasks;
-    private final Set<FileDeletionTask> successorTaskSet;
-    private final DeletionService delService;
-    // By default all tasks will start as success=true; however if any of
-    // the dependent task fails then it will be marked as false in
-    // fileDeletionTaskFinished().
-    private boolean success;
-    
-    private FileDeletionTask(DeletionService delService, String user,
-        Path subDir, List<Path> baseDirs) {
-      this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
-    }
-
-    private FileDeletionTask(int taskId, DeletionService delService,
-        String user, Path subDir, List<Path> baseDirs) {
-      this.taskId = taskId;
-      this.delService = delService;
-      this.user = user;
-      this.subDir = subDir;
-      this.baseDirs = baseDirs;
-      this.successorTaskSet = new HashSet<FileDeletionTask>();
-      this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
-      success = true;
-    }
-    
-    /**
-     * increments and returns pending predecessor task count
-     */
-    public int incrementAndGetPendingPredecessorTasks() {
-      return numberOfPendingPredecessorTasks.incrementAndGet();
-    }
-    
-    /**
-     * decrements and returns pending predecessor task count
-     */
-    public int decrementAndGetPendingPredecessorTasks() {
-      return numberOfPendingPredecessorTasks.decrementAndGet();
-    }
-    
-    @VisibleForTesting
-    public String getUser() {
-      return this.user;
-    }
-    
-    @VisibleForTesting
-    public Path getSubDir() {
-      return this.subDir;
-    }
-    
-    @VisibleForTesting
-    public List<Path> getBaseDirs() {
-      return this.baseDirs;
-    }
-    
-    public synchronized void setSuccess(boolean success) {
-      this.success = success;
-    }
-    
-    public synchronized boolean getSucess() {
-      return this.success;
-    }
-    
-    public synchronized FileDeletionTask[] getSuccessorTasks() {
-      FileDeletionTask[] successors =
-          new FileDeletionTask[successorTaskSet.size()];
-      return successorTaskSet.toArray(successors);
-    }
+  public NMStateStoreService getStateStore() {
+    return stateStore;
+  }
 
-    @Override
-    public void run() {
+  public void delete(DeletionTask deletionTask) {
+    if (debugDelay != -1) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(this);
-      }
-      boolean error = false;
-      if (null == user) {
-        if (baseDirs == null || baseDirs.size() == 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("NM deleting absolute path : " + subDir);
-          }
-          try {
-            lfs.delete(subDir, true);
-          } catch (IOException e) {
-            error = true;
-            LOG.warn("Failed to delete " + subDir);
-          }
-        } else {
-          for (Path baseDir : baseDirs) {
-            Path del = subDir == null? baseDir : new Path(baseDir, subDir);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("NM deleting path : " + del);
-            }
-            try {
-              lfs.delete(del, true);
-            } catch (IOException e) {
-              error = true;
-              LOG.warn("Failed to delete " + subDir);
-            }
-          }
-        }
-      } else {
-        try {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "Deleting path: [" + subDir + "] as user: [" + user + "]");
-          }
-          if (baseDirs == null || baseDirs.size() == 0) {
-            delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
-                .setUser(user)
-                .setSubDir(subDir)
-                .build());
-          } else {
-            delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
-                .setUser(user)
-                .setSubDir(subDir)
-                .setBasedirs(baseDirs.toArray(new Path[0]))
-                .build());
-          }
-        } catch (IOException e) {
-          error = true;
-          LOG.warn("Failed to delete as user " + user, e);
-        } catch (InterruptedException e) {
-          error = true;
-          LOG.warn("Failed to delete as user " + user, e);
-        }
-      }
-      if (error) {
-        setSuccess(!error);        
-      }
-      fileDeletionTaskFinished();
-    }
-
-    @Override
-    public String toString() {
-      StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
-      sb.append("  user : ").append(this.user);
-      sb.append("  subDir : ").append(
-        subDir == null ? "null" : subDir.toString());
-      sb.append("  baseDir : ");
-      if (baseDirs == null || baseDirs.size() == 0) {
-        sb.append("null");
-      } else {
-        for (Path baseDir : baseDirs) {
-          sb.append(baseDir.toString()).append(',');
-        }
-      }
-      return sb.toString();
-    }
-    
-    /**
-     * If there is a task dependency between say tasks 1,2,3 such that
-     * task2 and task3 can be started only after task1 then we should define
-     * task2 and task3 as successor tasks for task1.
-     * Note:- Task dependency should be defined prior to
-     * @param successorTask
-     */
-    public synchronized void addFileDeletionTaskDependency(
-        FileDeletionTask successorTask) {
-      if (successorTaskSet.add(successorTask)) {
-        successorTask.incrementAndGetPendingPredecessorTasks();
+        String msg = String.format("Scheduling DeletionTask (delay %d) : %s",
+            debugDelay, deletionTask.toString());
+        LOG.debug(msg);
       }
+      recordDeletionTaskInStateStore(deletionTask);
+      sched.schedule(deletionTask, debugDelay, TimeUnit.SECONDS);
     }
-    
-    /*
-     * This is called when
-     * 1) Current file deletion task ran and finished.
-     * 2) This can be even directly called by predecessor task if one of the
-     * dependent tasks of it has failed marking its success = false.  
-     */
-    private synchronized void fileDeletionTaskFinished() {
-      try {
-        delService.stateStore.removeDeletionTask(taskId);
-      } catch (IOException e) {
-        LOG.error("Unable to remove deletion task " + taskId
-            + " from state store", e);
-      }
-      Iterator<FileDeletionTask> successorTaskI =
-          this.successorTaskSet.iterator();
-      while (successorTaskI.hasNext()) {
-        FileDeletionTask successorTask = successorTaskI.next();
-        if (!success) {
-          successorTask.setSuccess(success);
-        }
-        int count = successorTask.decrementAndGetPendingPredecessorTasks();
-        if (count == 0) {
-          if (successorTask.getSucess()) {
-            successorTask.delService.scheduleFileDeletionTask(successorTask);
-          } else {
-            successorTask.fileDeletionTaskFinished();
-          }
-        }
-      }
-    }
-  }
-  
-  /**
-   * Helper method to create file deletion task. To be used only if we need
-   * a way to define dependencies between deletion tasks.
-   * @param user user on whose behalf this task is suppose to run
-   * @param subDir sub directory as required in 
-   * {@link DeletionService#delete(String, Path, Path...)}
-   * @param baseDirs base directories as required in
-   * {@link DeletionService#delete(String, Path, Path...)}
-   */
-  public FileDeletionTask createFileDeletionTask(String user, Path subDir,
-      Path[] baseDirs) {
-    return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
   }
 
-  private void recover(RecoveredDeletionServiceState state)
+  private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
       throws IOException {
     List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
     Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
-        new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
-    Set<Integer> successorTasks = new HashSet<Integer>();
+        new HashMap<>(taskProtos.size());
+    Set<Integer> successorTasks = new HashSet<>();
     for (DeletionServiceDeleteTaskProto proto : taskProtos) {
-      DeletionTaskRecoveryInfo info = parseTaskProto(proto);
-      idToInfoMap.put(info.task.taskId, info);
-      nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
-      successorTasks.addAll(info.successorTaskIds);
+      DeletionTaskRecoveryInfo info =
+          NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
+      idToInfoMap.put(info.getTask().getTaskId(), info);
+      nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
+      successorTasks.addAll(info.getSuccessorTaskIds());
     }
 
     // restore the task dependencies and schedule the deletion tasks that
     // have no predecessors
     final long now = System.currentTimeMillis();
     for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
-      for (Integer successorId : info.successorTaskIds){
+      for (Integer successorId : info.getSuccessorTaskIds()){
         DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
         if (successor != null) {
-          info.task.addFileDeletionTaskDependency(successor.task);
+          info.getTask().addDeletionTaskDependency(successor.getTask());
         } else {
           LOG.error("Unable to locate dependency task for deletion task "
-              + info.task.taskId + " at " + info.task.getSubDir());
+              + info.getTask().getTaskId());
         }
       }
-      if (!successorTasks.contains(info.task.taskId)) {
-        long msecTilDeletion = info.deletionTimestamp - now;
-        sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
+      if (!successorTasks.contains(info.getTask().getTaskId())) {
+        long msecTilDeletion = info.getDeletionTimestamp() - now;
+        sched.schedule(info.getTask(), msecTilDeletion, TimeUnit.MILLISECONDS);
       }
     }
   }
 
-  private DeletionTaskRecoveryInfo parseTaskProto(
-      DeletionServiceDeleteTaskProto proto) throws IOException {
-    int taskId = proto.getId();
-    String user = proto.hasUser() ? proto.getUser() : null;
-    Path subdir = null;
-    List<Path> basePaths = null;
-    if (proto.hasSubdir()) {
-      subdir = new Path(proto.getSubdir());
-    }
-    List<String> basedirs = proto.getBasedirsList();
-    if (basedirs != null && basedirs.size() > 0) {
-      basePaths = new ArrayList<Path>(basedirs.size());
-      for (String basedir : basedirs) {
-        basePaths.add(new Path(basedir));
-      }
-    }
-
-    FileDeletionTask task = new FileDeletionTask(taskId, this, user,
-        subdir, basePaths);
-    return new DeletionTaskRecoveryInfo(task,
-        proto.getSuccessorIdsList(),
-        proto.getDeletionTime());
-  }
-
   private int generateTaskId() {
     // get the next ID but avoid an invalid ID
     int taskId = nextTaskId.incrementAndGet();
-    while (taskId == FileDeletionTask.INVALID_TASK_ID) {
+    while (taskId == DeletionTask.INVALID_TASK_ID) {
       taskId = nextTaskId.incrementAndGet();
     }
     return taskId;
   }
 
-  private void recordDeletionTaskInStateStore(FileDeletionTask task) {
+  private void recordDeletionTaskInStateStore(DeletionTask task) {
     if (!stateStore.canRecover()) {
       // optimize the case where we aren't really recording
       return;
     }
-    if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
+    if (task.getTaskId() != DeletionTask.INVALID_TASK_ID) {
       return;  // task already recorded
     }
 
-    task.taskId = generateTaskId();
-
-    FileDeletionTask[] successors = task.getSuccessorTasks();
+    task.setTaskId(generateTaskId());
 
     // store successors first to ensure task IDs have been generated for them
-    for (FileDeletionTask successor : successors) {
+    DeletionTask[] successors = task.getSuccessorTasks();
+    for (DeletionTask successor : successors) {
       recordDeletionTaskInStateStore(successor);
     }
 
-    DeletionServiceDeleteTaskProto.Builder builder =
-        DeletionServiceDeleteTaskProto.newBuilder();
-    builder.setId(task.taskId);
-    if (task.getUser() != null) {
-      builder.setUser(task.getUser());
-    }
-    if (task.getSubDir() != null) {
-      builder.setSubdir(task.getSubDir().toString());
-    }
-    builder.setDeletionTime(System.currentTimeMillis() +
-        TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
-    if (task.getBaseDirs() != null) {
-      for (Path dir : task.getBaseDirs()) {
-        builder.addBasedirs(dir.toString());
-      }
-    }
-    for (FileDeletionTask successor : successors) {
-      builder.addSuccessorIds(successor.taskId);
-    }
-
     try {
-      stateStore.storeDeletionTask(task.taskId, builder.build());
+      stateStore.storeDeletionTask(task.getTaskId(),
+          task.convertDeletionTaskToProto());
     } catch (IOException e) {
-      LOG.error("Unable to store deletion task " + task.taskId + " for "
-          + task.getSubDir(), e);
+      LOG.error("Unable to store deletion task " + task.getTaskId(), e);
     }
   }
 
-  private static class DeletionTaskRecoveryInfo {
-    FileDeletionTask task;
-    List<Integer> successorTaskIds;
-    long deletionTimestamp;
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    ThreadFactory tf = new ThreadFactoryBuilder()
+        .setNameFormat("DeletionService #%d")
+        .build();
+    if (conf != null) {
+      sched = new HadoopScheduledThreadPoolExecutor(
+          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
+              YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
+      debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
+    } else {
+      sched = new HadoopScheduledThreadPoolExecutor(
+          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
+    }
+    sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    sched.setKeepAliveTime(60L, SECONDS);
+    if (stateStore.canRecover()) {
+      recover(stateStore.loadDeletionServiceState());
+    }
+    super.serviceInit(conf);
+  }
 
-    public DeletionTaskRecoveryInfo(FileDeletionTask task,
-        List<Integer> successorTaskIds, long deletionTimestamp) {
-      this.task = task;
-      this.successorTaskIds = successorTaskIds;
-      this.deletionTimestamp = deletionTimestamp;
+  @Override
+  public void serviceStop() throws Exception {
+    if (sched != null) {
+      sched.shutdown();
+      boolean terminated = false;
+      try {
+        terminated = sched.awaitTermination(10, SECONDS);
+      } catch (InterruptedException e) { }
+      if (!terminated) {
+        sched.shutdownNow();
+      }
     }
+    super.serviceStop();
+  }
+
+  /**
+   * Determine if the service has completely stopped.
+   * Used only by unit tests
+   * @return true if service has completely stopped
+   */
+  @Private
+  public boolean isTerminated() {
+    return getServiceState() == STATE.STOPPED && sched.isTerminated();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java
new file mode 100644
index 0000000..e47b3ee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities for converting from PB representations.
+ */
+public final class NMProtoUtils {
+
+  private static final Log LOG = LogFactory.getLog(NMProtoUtils.class);
+
+  private NMProtoUtils() { }
+
+  /**
+   * Convert the Protobuf representation into a {@link DeletionTask}.
+   *
+   * @param proto             the Protobuf representation for the DeletionTask
+   * @param deletionService   the {@link DeletionService}
+   * @return the converted {@link DeletionTask}
+   */
+  public static DeletionTask convertProtoToDeletionTask(
+      DeletionServiceDeleteTaskProto proto, DeletionService deletionService) {
+    int taskId = proto.getId();
+    if (proto.hasTaskType() && proto.getTaskType() != null) {
+      if (proto.getTaskType().equals(DeletionTaskType.FILE.name())) {
+        LOG.debug("Converting recovered FileDeletionTask");
+        return convertProtoToFileDeletionTask(proto, deletionService, taskId);
+      }
+    }
+    LOG.debug("Unable to get task type, trying FileDeletionTask");
+    return convertProtoToFileDeletionTask(proto, deletionService, taskId);
+  }
+
+  /**
+   * Convert the Protobuf representation into the {@link FileDeletionTask}.
+   *
+   * @param proto the Protobuf representation of the {@link FileDeletionTask}
+   * @param deletionService the {@link DeletionService}.
+   * @param taskId the ID of the {@link DeletionTask}.
+   * @return the populated {@link FileDeletionTask}.
+   */
+  public static FileDeletionTask convertProtoToFileDeletionTask(
+      DeletionServiceDeleteTaskProto proto, DeletionService deletionService,
+      int taskId) {
+    String user = proto.hasUser() ? proto.getUser() : null;
+    Path subdir = null;
+    if (proto.hasSubdir()) {
+      subdir = new Path(proto.getSubdir());
+    }
+    List<Path> basePaths = null;
+    List<String> basedirs = proto.getBasedirsList();
+    if (basedirs != null && basedirs.size() > 0) {
+      basePaths = new ArrayList<>(basedirs.size());
+      for (String basedir : basedirs) {
+        basePaths.add(new Path(basedir));
+      }
+    }
+    return new FileDeletionTask(taskId, deletionService, user, subdir,
+        basePaths);
+  }
+
+  /**
+   * Convert the Protobuf representation to the {@link DeletionTaskRecoveryInfo}
+   * representation.
+   *
+   * @param proto the Protobuf representation of the {@link DeletionTask}
+   * @param deletionService the {@link DeletionService}
+   * @return the populated {@link DeletionTaskRecoveryInfo}
+   */
+  public static DeletionTaskRecoveryInfo convertProtoToDeletionTaskRecoveryInfo(
+      DeletionServiceDeleteTaskProto proto, DeletionService deletionService) {
+    DeletionTask deletionTask =
+        NMProtoUtils.convertProtoToDeletionTask(proto, deletionService);
+    List<Integer> successorTaskIds = new ArrayList<>();
+    if (proto.getSuccessorIdsList() != null &&
+        !proto.getSuccessorIdsList().isEmpty()) {
+      successorTaskIds = proto.getSuccessorIdsList();
+    }
+    long deletionTimestamp = proto.getDeletionTime();
+    return new DeletionTaskRecoveryInfo(deletionTask, successorTaskIds,
+        deletionTimestamp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java
new file mode 100644
index 0000000..006f49f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package containing classes for working with Protobuf.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java
new file mode 100644
index 0000000..c62ea02
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
+
+import java.util.List;
+
+/**
+ * Encapsulates the recovery info needed to recover a DeletionTask from the NM
+ * state store.
+ */
+public class DeletionTaskRecoveryInfo {
+
+  private DeletionTask task;
+  private List<Integer> successorTaskIds;
+  private long deletionTimestamp;
+
+  /**
+   * Information needed for recovering the DeletionTask.
+   *
+   * @param task the DeletionTask
+   * @param successorTaskIds the dependent DeletionTasks.
+   * @param deletionTimestamp the scheduled times of deletion.
+   */
+  public DeletionTaskRecoveryInfo(DeletionTask task,
+      List<Integer> successorTaskIds, long deletionTimestamp) {
+    this.task = task;
+    this.successorTaskIds = successorTaskIds;
+    this.deletionTimestamp = deletionTimestamp;
+  }
+
+  /**
+   * Return the recovered DeletionTask.
+   *
+   * @return the recovered DeletionTask.
+   */
+  public DeletionTask getTask() {
+    return task;
+  }
+
+  /**
+   * Return all of the dependent DeletionTasks.
+   *
+   * @return the dependent DeletionTasks.
+   */
+  public List<Integer> getSuccessorTaskIds() {
+    return successorTaskIds;
+  }
+
+  /**
+   * Return the deletion timestamp.
+   *
+   * @return the deletion timestamp.
+   */
+  public long getDeletionTimestamp() {
+    return deletionTimestamp;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java
new file mode 100644
index 0000000..28d7f62
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package containing classes for recovering DeletionTasks.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java
new file mode 100644
index 0000000..635d7a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * DeletionTasks are supplied to the {@link DeletionService} for deletion.
+ */
+public abstract class DeletionTask implements Runnable {
+
+  static final Log LOG = LogFactory.getLog(DeletionTask.class);
+
+  public static final int INVALID_TASK_ID = -1;
+
+  private int taskId;
+  private String user;
+  private DeletionTaskType deletionTaskType;
+  private DeletionService deletionService;
+  private final AtomicInteger numberOfPendingPredecessorTasks;
+  private final Set<DeletionTask> successorTaskSet;
+  // By default all tasks will start as success=true; however if any of
+  // the dependent task fails then it will be marked as false in
+  // deletionTaskFinished().
+  private boolean success;
+
+  /**
+   * Deletion task with taskId and default values.
+   *
+   * @param taskId              the ID of the task, if previously set.
+   * @param deletionService     the {@link DeletionService}.
+   * @param user                the user associated with the delete.
+   * @param deletionTaskType    the {@link DeletionTaskType}.
+   */
+  public DeletionTask(int taskId, DeletionService deletionService, String user,
+      DeletionTaskType deletionTaskType) {
+    this(taskId, deletionService, user, new AtomicInteger(0),
+        new HashSet<DeletionTask>(), deletionTaskType);
+  }
+
+  /**
+   * Deletion task with taskId and user supplied values.
+   *
+   * @param taskId              the ID of the task, if previously set.
+   * @param deletionService     the {@link DeletionService}.
+   * @param user                the user associated with the delete.
+   * @param numberOfPendingPredecessorTasks  Number of pending tasks.
+   * @param successorTaskSet    the list of successor DeletionTasks
+   * @param deletionTaskType    the {@link DeletionTaskType}.
+   */
+  public DeletionTask(int taskId, DeletionService deletionService, String user,
+      AtomicInteger numberOfPendingPredecessorTasks,
+      Set<DeletionTask> successorTaskSet, DeletionTaskType deletionTaskType) {
+    this.taskId = taskId;
+    this.deletionService = deletionService;
+    this.user = user;
+    this.numberOfPendingPredecessorTasks = numberOfPendingPredecessorTasks;
+    this.successorTaskSet = successorTaskSet;
+    this.deletionTaskType = deletionTaskType;
+    success = true;
+  }
+
+  /**
+   * Get the taskId for the DeletionTask.
+   *
+   * @return the taskId.
+   */
+  public int getTaskId() {
+    return taskId;
+  }
+
+  /**
+   * Set the taskId for the DeletionTask.
+   *
+   * @param taskId the taskId.
+   */
+  public void setTaskId(int taskId) {
+    this.taskId = taskId;
+  }
+
+  /**
+   * The the user assoicated with the DeletionTask.
+   *
+   * @return the user name.
+   */
+  public String getUser() {
+    return user;
+  }
+
+  /**
+   * Get the {@link DeletionService} for this DeletionTask.
+   *
+   * @return the {@link DeletionService}.
+   */
+  public DeletionService getDeletionService() {
+    return deletionService;
+  }
+
+  /**
+   * Get the {@link DeletionTaskType} for this DeletionTask.
+   *
+   * @return the {@link DeletionTaskType}.
+   */
+  public DeletionTaskType getDeletionTaskType() {
+    return deletionTaskType;
+  }
+
+  /**
+   * Set the DeletionTask run status.
+   *
+   * @param success the status of the running DeletionTask.
+   */
+  public synchronized void setSuccess(boolean success) {
+    this.success = success;
+  }
+
+  /**
+   * Return the DeletionTask run status.
+   *
+   * @return the status of the running DeletionTask.
+   */
+  public synchronized boolean getSucess() {
+    return this.success;
+  }
+
+  /**
+   * Return the list of successor tasks for the DeletionTask.
+   *
+   * @return the list of successor tasks.
+   */
+  public synchronized DeletionTask[] getSuccessorTasks() {
+    DeletionTask[] successors = new DeletionTask[successorTaskSet.size()];
+    return successorTaskSet.toArray(successors);
+  }
+
+  /**
+   * Convert the DeletionTask to the Protobuf representation for storing in the
+   * state store and recovery.
+   *
+   * @return the protobuf representation of the DeletionTask.
+   */
+  public abstract DeletionServiceDeleteTaskProto convertDeletionTaskToProto();
+
+  /**
+   * Add a dependent DeletionTask.
+   *
+   * If there is a task dependency between say tasks 1,2,3 such that
+   * task2 and task3 can be started only after task1 then we should define
+   * task2 and task3 as successor tasks for task1.
+   * Note:- Task dependency should be defined prior to calling delete.
+   *
+   * @param successorTask the DeletionTask the depends on this DeletionTask.
+   */
+  public synchronized void addDeletionTaskDependency(
+      DeletionTask successorTask) {
+    if (successorTaskSet.add(successorTask)) {
+      successorTask.incrementAndGetPendingPredecessorTasks();
+    }
+  }
+
+  /**
+   * Increments and returns pending predecessor task count.
+   *
+   * @return the number of pending predecessor DeletionTasks.
+   */
+  public int incrementAndGetPendingPredecessorTasks() {
+    return numberOfPendingPredecessorTasks.incrementAndGet();
+  }
+
+  /**
+   * Decrements and returns pending predecessor task count.
+   *
+   * @return the number of pending predecessor DeletionTasks.
+   */
+  public int decrementAndGetPendingPredecessorTasks() {
+    return numberOfPendingPredecessorTasks.decrementAndGet();
+  }
+
+  /**
+   * Removes the DeletionTask from the state store and validates that successor
+   * tasks have been scheduled and completed.
+   *
+   * This is called when:
+   * 1) Current deletion task ran and finished.
+   * 2) When directly called by predecessor task if one of the
+   * dependent tasks of it has failed marking its success = false.
+   */
+  synchronized void deletionTaskFinished() {
+    try {
+      NMStateStoreService stateStore = deletionService.getStateStore();
+      stateStore.removeDeletionTask(taskId);
+    } catch (IOException e) {
+      LOG.error("Unable to remove deletion task " + taskId
+          + " from state store", e);
+    }
+    Iterator<DeletionTask> successorTaskI = this.successorTaskSet.iterator();
+    while (successorTaskI.hasNext()) {
+      DeletionTask successorTask = successorTaskI.next();
+      if (!success) {
+        successorTask.setSuccess(success);
+      }
+      int count = successorTask.decrementAndGetPendingPredecessorTasks();
+      if (count == 0) {
+        if (successorTask.getSucess()) {
+          successorTask.deletionService.delete(successorTask);
+        } else {
+          successorTask.deletionTaskFinished();
+        }
+      }
+    }
+  }
+
+  /**
+   * Return the Protobuf builder with the base DeletionTask attributes.
+   *
+   * @return pre-populated Buidler with the base attributes.
+   */
+  DeletionServiceDeleteTaskProto.Builder getBaseDeletionTaskProtoBuilder() {
+    DeletionServiceDeleteTaskProto.Builder builder =
+        DeletionServiceDeleteTaskProto.newBuilder();
+    builder.setId(getTaskId());
+    if (getUser() != null) {
+      builder.setUser(getUser());
+    }
+    builder.setDeletionTime(System.currentTimeMillis() +
+        TimeUnit.MILLISECONDS.convert(getDeletionService().getDebugDelay(),
+            TimeUnit.SECONDS));
+    for (DeletionTask successor : getSuccessorTasks()) {
+      builder.addSuccessorIds(successor.getTaskId());
+    }
+    return builder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java
new file mode 100644
index 0000000..676c71b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+/**
+ * Available types of {@link DeletionTask}s.
+ */
+public enum DeletionTaskType {
+  FILE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java
new file mode 100644
index 0000000..fd07f16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link DeletionTask} handling the removal of files (and directories).
+ */
+public class FileDeletionTask extends DeletionTask implements Runnable {
+
+  private final Path subDir;
+  private final List<Path> baseDirs;
+  private static final FileContext lfs = getLfs();
+
+  private static FileContext getLfs() {
+    try {
+      return FileContext.getLocalFSFileContext();
+    } catch (UnsupportedFileSystemException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Construct a FileDeletionTask with the default INVALID_TASK_ID.
+   *
+   * @param deletionService     the {@link DeletionService}.
+   * @param user                the user deleting the file.
+   * @param subDir              the subdirectory to delete.
+   * @param baseDirs            the base directories containing the subdir.
+   */
+  public FileDeletionTask(DeletionService deletionService, String user,
+      Path subDir, List<Path> baseDirs) {
+    this(INVALID_TASK_ID, deletionService, user, subDir, baseDirs);
+  }
+
+  /**
+   * Construct a FileDeletionTask with the default INVALID_TASK_ID.
+   *
+   * @param taskId              the ID of the task, if previously set.
+   * @param deletionService     the {@link DeletionService}.
+   * @param user                the user deleting the file.
+   * @param subDir              the subdirectory to delete.
+   * @param baseDirs            the base directories containing the subdir.
+   */
+  public FileDeletionTask(int taskId, DeletionService deletionService,
+      String user, Path subDir, List<Path> baseDirs) {
+    super(taskId, deletionService, user, DeletionTaskType.FILE);
+    this.subDir = subDir;
+    this.baseDirs = baseDirs;
+  }
+
+  /**
+   * Get the subdirectory to delete.
+   *
+   * @return the subDir for the FileDeletionTask.
+   */
+  public Path getSubDir() {
+    return this.subDir;
+  }
+
+  /**
+   * Get the base directories containing the subdirectory.
+   *
+   * @return the base directories for the FileDeletionTask.
+   */
+  public List<Path> getBaseDirs() {
+    return this.baseDirs;
+  }
+
+  /**
+   * Delete the specified file/directory as the specified user.
+   */
+  @Override
+  public void run() {
+    if (LOG.isDebugEnabled()) {
+      String msg = String.format("Running DeletionTask : %s", toString());
+      LOG.debug(msg);
+    }
+    boolean error = false;
+    if (null == getUser()) {
+      if (baseDirs == null || baseDirs.size() == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NM deleting absolute path : " + subDir);
+        }
+        try {
+          lfs.delete(subDir, true);
+        } catch (IOException e) {
+          error = true;
+          LOG.warn("Failed to delete " + subDir);
+        }
+      } else {
+        for (Path baseDir : baseDirs) {
+          Path del = subDir == null? baseDir : new Path(baseDir, subDir);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("NM deleting path : " + del);
+          }
+          try {
+            lfs.delete(del, true);
+          } catch (IOException e) {
+            error = true;
+            LOG.warn("Failed to delete " + subDir);
+          }
+        }
+      }
+    } else {
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Deleting path: [" + subDir + "] as user: [" + getUser() + "]");
+        }
+        if (baseDirs == null || baseDirs.size() == 0) {
+          getDeletionService().getContainerExecutor().deleteAsUser(
+              new DeletionAsUserContext.Builder()
+              .setUser(getUser())
+              .setSubDir(subDir)
+              .build());
+        } else {
+          getDeletionService().getContainerExecutor().deleteAsUser(
+              new DeletionAsUserContext.Builder()
+              .setUser(getUser())
+              .setSubDir(subDir)
+              .setBasedirs(baseDirs.toArray(new Path[0]))
+              .build());
+        }
+      } catch (IOException|InterruptedException e) {
+        error = true;
+        LOG.warn("Failed to delete as user " + getUser(), e);
+      }
+    }
+    if (error) {
+      setSuccess(!error);
+    }
+    deletionTaskFinished();
+  }
+
+  /**
+   * Convert the FileDeletionTask to a String representation.
+   *
+   * @return String representation of the FileDeletionTask.
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("FileDeletionTask :");
+    sb.append("  id : ").append(getTaskId());
+    sb.append("  user : ").append(getUser());
+    sb.append("  subDir : ").append(
+        subDir == null ? "null" : subDir.toString());
+    sb.append("  baseDir : ");
+    if (baseDirs == null || baseDirs.size() == 0) {
+      sb.append("null");
+    } else {
+      for (Path baseDir : baseDirs) {
+        sb.append(baseDir.toString()).append(',');
+      }
+    }
+    return sb.toString().trim();
+  }
+
+  /**
+   * Convert the FileDeletionTask to the Protobuf representation for storing
+   * in the state store and recovery.
+   *
+   * @return the protobuf representation of the FileDeletionTask.
+   */
+  public DeletionServiceDeleteTaskProto convertDeletionTaskToProto() {
+    DeletionServiceDeleteTaskProto.Builder builder =
+        getBaseDeletionTaskProtoBuilder();
+    builder.setTaskType(DeletionTaskType.FILE.name());
+    if (getSubDir() != null) {
+      builder.setSubdir(getSubDir().toString());
+    }
+    if (getBaseDirs() != null) {
+      for (Path dir : getBaseDirs()) {
+        builder.addBasedirs(dir.toString());
+      }
+    }
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java
new file mode 100644
index 0000000..f1a3985
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package containing DeletionTasks for use with the DeletionService.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index af34e92..47e6a55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
@@ -113,9 +114,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
     this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
     if (this.useLocalCacheDirectoryManager) {
       directoryManagers =
-          new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
+          new ConcurrentHashMap<>();
       inProgressLocalResourcesMap =
-          new ConcurrentHashMap<LocalResourceRequest, Path>();
+          new ConcurrentHashMap<>();
     }
     this.conf = conf;
     this.stateStore = stateStore;
@@ -393,7 +394,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
       return false;
     } else { // ResourceState is LOCALIZED or INIT
       if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
-        delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
+        FileDeletionTask deletionTask = new FileDeletionTask(delService,
+            getUser(), getPathToDelete(rsrc.getLocalPath()), null);
+        delService.delete(deletionTask);
       }
       removeResource(rem.getRequest());
       LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
@@ -488,7 +491,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
       LOG.warn("Directory " + uniquePath + " already exists, " +
           "try next one.");
       if (delService != null) {
-        delService.delete(getUser(), uniquePath);
+        FileDeletionTask deletionTask = new FileDeletionTask(delService,
+            getUser(), uniquePath, null);
+        delService.delete(deletionTask);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 663bad7..5bc0da7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -95,7 +95,6 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
@@ -113,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
@@ -604,7 +604,9 @@ public class ResourceLocalizationService extends CompositeService
   private void submitDirForDeletion(String userName, Path dir) {
     try {
       lfs.getFileStatus(dir);
-      delService.delete(userName, dir, new Path[] {});
+      FileDeletionTask deletionTask = new FileDeletionTask(delService, userName,
+          dir, null);
+      delService.delete(deletionTask);
     } catch (UnsupportedFileSystemException ue) {
       LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
     } catch (IOException ie) {
@@ -1234,10 +1236,13 @@ public class ResourceLocalizationService extends CompositeService
           event.getResource().unlock();
         }
         if (!paths.isEmpty()) {
-          delService.delete(context.getUser(),
-              null, paths.toArray(new Path[paths.size()]));
+          FileDeletionTask deletionTask = new FileDeletionTask(delService,
+              context.getUser(), null, paths);
+          delService.delete(deletionTask);
         }
-        delService.delete(null, nmPrivateCTokensPath, new Path[] {});
+        FileDeletionTask deletionTask = new FileDeletionTask(delService, null,
+            nmPrivateCTokensPath, null);
+        delService.delete(deletionTask);
       }
     }
 
@@ -1456,7 +1461,9 @@ public class ResourceLocalizationService extends CompositeService
         String appName = fileStatus.getPath().getName();
         if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) {
           LOG.info("delete app log dir," + appName);
-          del.delete(null, fileStatus.getPath());
+          FileDeletionTask deletionTask = new FileDeletionTask(del, null,
+              fileStatus.getPath(), null);
+          del.delete(deletionTask);
         }
       }
     }
@@ -1516,7 +1523,9 @@ public class ResourceLocalizationService extends CompositeService
               ||
               status.getPath().getName()
                   .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
-            del.delete(null, status.getPath(), new Path[] {});
+            FileDeletionTask deletionTask = new FileDeletionTask(del, null,
+                status.getPath(), null);
+            del.delete(deletionTask);
           }
         } catch (IOException ex) {
           // Do nothing, just give the warning
@@ -1530,24 +1539,25 @@ public class ResourceLocalizationService extends CompositeService
   private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
       Path userDirPath) throws IOException {
     RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
-    FileDeletionTask dependentDeletionTask =
-        del.createFileDeletionTask(null, userDirPath, new Path[] {});
+    FileDeletionTask dependentDeletionTask = new FileDeletionTask(del, null,
+        userDirPath, new ArrayList<Path>());
     if (userDirStatus != null && userDirStatus.hasNext()) {
       List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
       while (userDirStatus.hasNext()) {
         FileStatus status = userDirStatus.next();
         String owner = status.getOwner();
-        FileDeletionTask deletionTask =
-            del.createFileDeletionTask(owner, null,
-              new Path[] { status.getPath() });
-        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        List<Path> pathList = new ArrayList<>();
+        pathList.add(status.getPath());
+        FileDeletionTask deletionTask = new FileDeletionTask(del, owner, null,
+            pathList);
+        deletionTask.addDeletionTaskDependency(dependentDeletionTask);
         deletionTasks.add(deletionTask);
       }
       for (FileDeletionTask task : deletionTasks) {
-        del.scheduleFileDeletionTask(task);
+        del.delete(task);
       }
     } else {
-      del.scheduleFileDeletionTask(dependentDeletionTask);
+      del.delete(dependentDeletionTask);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index f465534..0d9e686 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
@@ -258,19 +260,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       return;
     }
 
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Credentials systemCredentials =
-          context.getSystemCredentialsForApps().get(appId);
-      if (systemCredentials != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding new framework-token for " + appId
-              + " for log-aggregation: " + systemCredentials.getAllTokens()
-              + "; userUgi=" + userUgi);
-        }
-        // this will replace old token
-        userUgi.addCredentials(systemCredentials);
-      }
-    }
+    addCredentials();
 
     // Create a set of Containers whose logs will be uploaded in this cycle.
     // It includes:
@@ -332,9 +322,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             finishedContainers.contains(container));
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
-          this.delService.delete(this.userUgi.getShortUserName(), null,
-              uploadedFilePathsInThisCycle
-                  .toArray(new Path[uploadedFilePathsInThisCycle.size()]));
+          List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
+          uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
+          DeletionTask deletionTask = new FileDeletionTask(delService,
+              this.userUgi.getShortUserName(), null,
+              uploadedFilePathsInThisCycleList);
+          delService.delete(deletionTask);
         }
 
         // This container is finished, and all its logs have been uploaded,
@@ -352,11 +345,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       }
 
       long currentTime = System.currentTimeMillis();
-      final Path renamedPath = this.rollingMonitorInterval <= 0
-              ? remoteNodeLogFileForApp : new Path(
-                remoteNodeLogFileForApp.getParent(),
-                remoteNodeLogFileForApp.getName() + "_"
-                    + currentTime);
+      final Path renamedPath = getRenamedPath(currentTime);
 
       final boolean rename = uploadedLogsInThisCycle;
       try {
@@ -396,6 +385,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
   }
 
+  private Path getRenamedPath(long currentTime) {
+    return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp
+        : new Path(remoteNodeLogFileForApp.getParent(),
+        remoteNodeLogFileForApp.getName() + "_" + currentTime);
+  }
+
+  private void addCredentials() {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials systemCredentials =
+          context.getSystemCredentialsForApps().get(appId);
+      if (systemCredentials != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding new framework-token for " + appId
+              + " for log-aggregation: " + systemCredentials.getAllTokens()
+              + "; userUgi=" + userUgi);
+        }
+        // this will replace old token
+        userUgi.addCredentials(systemCredentials);
+      }
+    }
+  }
+
   @VisibleForTesting
   protected LogWriter createLogWriter() {
     return new LogWriter();
@@ -561,8 +572,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
 
     if (localAppLogDirs.size() > 0) {
-      this.delService.delete(this.userUgi.getShortUserName(), null,
-        localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
+      List<Path> localAppLogDirsList = new ArrayList<>();
+      localAppLogDirsList.addAll(localAppLogDirs);
+      DeletionTask deletionTask = new FileDeletionTask(delService,
+          this.userUgi.getShortUserName(), null, localAppLogDirsList);
+      this.delService.delete(deletionTask);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 2901743..9961748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
@@ -247,8 +248,10 @@ public class NonAggregatingLogHandler extends AbstractService implements
         new ApplicationEvent(this.applicationId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
       if (localAppLogDirs.size() > 0) {
-        NonAggregatingLogHandler.this.delService.delete(user, null,
-          (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
+        FileDeletionTask deletionTask = new FileDeletionTask(
+            NonAggregatingLogHandler.this.delService, user, null,
+            localAppLogDirs);
+        NonAggregatingLogHandler.this.delService.delete(deletionTask);
       }
       try {
         NonAggregatingLogHandler.this.stateStore.removeLogDeleter(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index 7831711..7212953 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -41,6 +41,7 @@ message DeletionServiceDeleteTaskProto {
   optional int64 deletionTime = 4;
   repeated string basedirs = 5;
   repeated int32 successorIds = 6;
+  optional string taskType = 7;
 }
 
 message LocalizedResourceProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
index 2e0bbe0..87f4a1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
@@ -33,13 +33,14 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+
 public class TestDeletionService {
 
   private static final FileContext lfs = getLfs();
@@ -123,8 +124,9 @@ public class TestDeletionService {
     del.start();
     try {
       for (Path p : dirs) {
-        del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
-            p, null);
+        FileDeletionTask deletionTask = new FileDeletionTask(del,
+            (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null);
+        del.delete(deletionTask);
       }
 
       int msecToWait = 20 * 1000;
@@ -159,8 +161,10 @@ public class TestDeletionService {
       del.start();
       for (Path p : content) {
         assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
-        del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
-            p, baseDirs.toArray(new Path[4]));
+        FileDeletionTask deletionTask = new FileDeletionTask(del,
+            (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
+            baseDirs);
+        del.delete(deletionTask);
       }
 
       int msecToWait = 20 * 1000;
@@ -196,8 +200,9 @@ public class TestDeletionService {
       del.init(conf);
       del.start();
       for (Path p : dirs) {
-        del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
-            null);
+        FileDeletionTask deletionTask = new FileDeletionTask(del,
+            (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null);
+        del.delete(deletionTask);
       }
       int msecToWait = 20 * 1000;
       for (Path p : dirs) {
@@ -220,7 +225,9 @@ public class TestDeletionService {
     try {
       del.init(conf);
       del.start();
-      del.delete("dingo", new Path("/does/not/exist"));
+      FileDeletionTask deletionTask = new FileDeletionTask(del, "dingo",
+          new Path("/does/not/exist"), null);
+      del.delete(deletionTask);
     } finally {
       del.stop();
     }
@@ -247,18 +254,20 @@ public class TestDeletionService {
       // first we will try to delete sub directories which are present. This
       // should then trigger parent directory to be deleted.
       List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
-      
+
       FileDeletionTask dependentDeletionTask =
-          del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
+          new FileDeletionTask(del, null, dirs.get(0), new ArrayList<Path>());
       List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
       for (Path subDir : subDirs) {
+        List<Path> subDirList = new ArrayList<>();
+        subDirList.add(subDir);
         FileDeletionTask deletionTask =
-            del.createFileDeletionTask(null, null, new Path[] { subDir });
-        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+            new FileDeletionTask(del, null, dirs.get(0), subDirList);
+        deletionTask.addDeletionTaskDependency(dependentDeletionTask);
         deletionTasks.add(deletionTask);
       }
       for (FileDeletionTask task : deletionTasks) {
-        del.scheduleFileDeletionTask(task);
+        del.delete(task);
       }
 
       int msecToWait = 20 * 1000;
@@ -274,19 +283,21 @@ public class TestDeletionService {
       subDirs = buildDirs(r, dirs.get(1), 2);
       subDirs.add(new Path(dirs.get(1), "absentFile"));
       
-      dependentDeletionTask =
-          del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
+      dependentDeletionTask = new FileDeletionTask(del, null, dirs.get(1),
+          new ArrayList<Path>());
       deletionTasks = new ArrayList<FileDeletionTask>();
       for (Path subDir : subDirs) {
-        FileDeletionTask deletionTask =
-            del.createFileDeletionTask(null, null, new Path[] { subDir });
-        deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+        List<Path> subDirList = new ArrayList<>();
+        subDirList.add(subDir);
+        FileDeletionTask deletionTask = new FileDeletionTask(del, null, null,
+            subDirList);
+        deletionTask.addDeletionTaskDependency(dependentDeletionTask);
         deletionTasks.add(deletionTask);
       }
       // marking one of the tasks as a failure.
       deletionTasks.get(2).setSuccess(false);
       for (FileDeletionTask task : deletionTasks) {
-        del.scheduleFileDeletionTask(task);
+        del.delete(task);
       }
 
       msecToWait = 20 * 1000;
@@ -327,8 +338,10 @@ public class TestDeletionService {
       del.start();
       for (Path p : content) {
         assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
-        del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
-            p, baseDirs.toArray(new Path[4]));
+        FileDeletionTask deletionTask = new FileDeletionTask(del,
+            (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
+            baseDirs);
+        del.delete(deletionTask);
       }
 
       // restart the deletion service
@@ -341,8 +354,10 @@ public class TestDeletionService {
       // verify paths are still eventually deleted
       int msecToWait = 10 * 1000;
       for (Path p : baseDirs) {
+        System.out.println("TEST Basedir: " + p.getName());
         for (Path q : content) {
           Path fp = new Path(p, q);
+          System.out.println("TEST Path: " + fp.toString());
           while (msecToWait > 0 && lfs.util().exists(fp)) {
             Thread.sleep(100);
             msecToWait -= 100;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org