You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2017/06/06 08:28:16 UTC
[26/50] [abbrv] hadoop git commit: YARN-6366. Refactor the
NodeManager DeletionService to support additional DeletionTask types.
Contributed by Shane Kumpf.
YARN-6366. Refactor the NodeManager DeletionService to support additional DeletionTask types. Contributed by Shane Kumpf.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/547f18cb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/547f18cb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/547f18cb
Branch: refs/heads/HADOOP-13345
Commit: 547f18cb96aeda55cc19b38be2be4d631b3a5f4f
Parents: 4b4a652
Author: Varun Vasudev <vv...@apache.org>
Authored: Wed May 31 16:15:35 2017 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Wed May 31 16:15:35 2017 +0530
----------------------------------------------------------------------
.../server/nodemanager/DeletionService.java | 468 ++++---------------
.../nodemanager/api/impl/pb/NMProtoUtils.java | 110 +++++
.../nodemanager/api/impl/pb/package-info.java | 25 +
.../recovery/DeletionTaskRecoveryInfo.java | 73 +++
.../deletion/recovery/package-info.java | 25 +
.../deletion/task/DeletionTask.java | 258 ++++++++++
.../deletion/task/DeletionTaskType.java | 24 +
.../deletion/task/FileDeletionTask.java | 202 ++++++++
.../deletion/task/package-info.java | 25 +
.../localizer/LocalResourcesTrackerImpl.java | 13 +-
.../localizer/ResourceLocalizationService.java | 40 +-
.../logaggregation/AppLogAggregatorImpl.java | 60 ++-
.../loghandler/NonAggregatingLogHandler.java | 7 +-
.../yarn_server_nodemanager_recovery.proto | 1 +
.../server/nodemanager/TestDeletionService.java | 57 ++-
.../nodemanager/TestNodeManagerReboot.java | 99 +---
.../api/impl/pb/TestNMProtoUtils.java | 91 ++++
.../BaseContainerManagerTest.java | 7 +-
.../deletion/task/FileDeletionMatcher.java | 84 ++++
.../deletion/task/TestFileDeletionTask.java | 85 ++++
.../TestLocalResourcesTrackerImpl.java | 5 +-
.../TestResourceLocalizationService.java | 33 +-
.../TestAppLogAggregatorImpl.java | 15 +-
.../TestLogAggregationService.java | 17 +-
.../TestNonAggregatingLogHandler.java | 8 +-
25 files changed, 1274 insertions(+), 558 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
index aac0af9..38d69a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,461 +35,176 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
-import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.NMProtoUtils;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DeletionService extends AbstractService {
- static final Log LOG = LogFactory.getLog(DeletionService.class);
+
+ private static final Log LOG = LogFactory.getLog(DeletionService.class);
+
private int debugDelay;
- private final ContainerExecutor exec;
- private ScheduledThreadPoolExecutor sched;
- private static final FileContext lfs = getLfs();
+ private final ContainerExecutor containerExecutor;
private final NMStateStoreService stateStore;
+ private ScheduledThreadPoolExecutor sched;
private AtomicInteger nextTaskId = new AtomicInteger(0);
- static final FileContext getLfs() {
- try {
- return FileContext.getLocalFSFileContext();
- } catch (UnsupportedFileSystemException e) {
- throw new RuntimeException(e);
- }
- }
-
public DeletionService(ContainerExecutor exec) {
this(exec, new NMNullStateStoreService());
}
- public DeletionService(ContainerExecutor exec,
+ public DeletionService(ContainerExecutor containerExecutor,
NMStateStoreService stateStore) {
super(DeletionService.class.getName());
- this.exec = exec;
+ this.containerExecutor = containerExecutor;
this.debugDelay = 0;
this.stateStore = stateStore;
}
-
- /**
- * Delete the path(s) as this user.
- * @param user The user to delete as, or the JVM user if null
- * @param subDir the sub directory name
- * @param baseDirs the base directories which contains the subDir's
- */
- public void delete(String user, Path subDir, Path... baseDirs) {
- // TODO if parent owned by NM, rename within parent inline
- if (debugDelay != -1) {
- List<Path> baseDirList = null;
- if (baseDirs != null && baseDirs.length != 0) {
- baseDirList = Arrays.asList(baseDirs);
- }
- FileDeletionTask task =
- new FileDeletionTask(this, user, subDir, baseDirList);
- recordDeletionTaskInStateStore(task);
- sched.schedule(task, debugDelay, TimeUnit.SECONDS);
- }
- }
-
- public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
- if (debugDelay != -1) {
- recordDeletionTaskInStateStore(fileDeletionTask);
- sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
- }
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- ThreadFactory tf = new ThreadFactoryBuilder()
- .setNameFormat("DeletionService #%d")
- .build();
- if (conf != null) {
- sched = new HadoopScheduledThreadPoolExecutor(
- conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
- YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
- debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
- } else {
- sched = new HadoopScheduledThreadPoolExecutor(
- YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
- }
- sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- sched.setKeepAliveTime(60L, SECONDS);
- if (stateStore.canRecover()) {
- recover(stateStore.loadDeletionServiceState());
- }
- super.serviceInit(conf);
- }
- @Override
- protected void serviceStop() throws Exception {
- if (sched != null) {
- sched.shutdown();
- boolean terminated = false;
- try {
- terminated = sched.awaitTermination(10, SECONDS);
- } catch (InterruptedException e) {
- }
- if (terminated != true) {
- sched.shutdownNow();
- }
- }
- super.serviceStop();
+ public int getDebugDelay() {
+ return debugDelay;
}
- /**
- * Determine if the service has completely stopped.
- * Used only by unit tests
- * @return true if service has completely stopped
- */
- @Private
- public boolean isTerminated() {
- return getServiceState() == STATE.STOPPED && sched.isTerminated();
+ public ContainerExecutor getContainerExecutor() {
+ return containerExecutor;
}
- public static class FileDeletionTask implements Runnable {
- public static final int INVALID_TASK_ID = -1;
- private int taskId;
- private final String user;
- private final Path subDir;
- private final List<Path> baseDirs;
- private final AtomicInteger numberOfPendingPredecessorTasks;
- private final Set<FileDeletionTask> successorTaskSet;
- private final DeletionService delService;
- // By default all tasks will start as success=true; however if any of
- // the dependent task fails then it will be marked as false in
- // fileDeletionTaskFinished().
- private boolean success;
-
- private FileDeletionTask(DeletionService delService, String user,
- Path subDir, List<Path> baseDirs) {
- this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
- }
-
- private FileDeletionTask(int taskId, DeletionService delService,
- String user, Path subDir, List<Path> baseDirs) {
- this.taskId = taskId;
- this.delService = delService;
- this.user = user;
- this.subDir = subDir;
- this.baseDirs = baseDirs;
- this.successorTaskSet = new HashSet<FileDeletionTask>();
- this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
- success = true;
- }
-
- /**
- * increments and returns pending predecessor task count
- */
- public int incrementAndGetPendingPredecessorTasks() {
- return numberOfPendingPredecessorTasks.incrementAndGet();
- }
-
- /**
- * decrements and returns pending predecessor task count
- */
- public int decrementAndGetPendingPredecessorTasks() {
- return numberOfPendingPredecessorTasks.decrementAndGet();
- }
-
- @VisibleForTesting
- public String getUser() {
- return this.user;
- }
-
- @VisibleForTesting
- public Path getSubDir() {
- return this.subDir;
- }
-
- @VisibleForTesting
- public List<Path> getBaseDirs() {
- return this.baseDirs;
- }
-
- public synchronized void setSuccess(boolean success) {
- this.success = success;
- }
-
- public synchronized boolean getSucess() {
- return this.success;
- }
-
- public synchronized FileDeletionTask[] getSuccessorTasks() {
- FileDeletionTask[] successors =
- new FileDeletionTask[successorTaskSet.size()];
- return successorTaskSet.toArray(successors);
- }
+ public NMStateStoreService getStateStore() {
+ return stateStore;
+ }
- @Override
- public void run() {
+ public void delete(DeletionTask deletionTask) {
+ if (debugDelay != -1) {
if (LOG.isDebugEnabled()) {
- LOG.debug(this);
- }
- boolean error = false;
- if (null == user) {
- if (baseDirs == null || baseDirs.size() == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("NM deleting absolute path : " + subDir);
- }
- try {
- lfs.delete(subDir, true);
- } catch (IOException e) {
- error = true;
- LOG.warn("Failed to delete " + subDir);
- }
- } else {
- for (Path baseDir : baseDirs) {
- Path del = subDir == null? baseDir : new Path(baseDir, subDir);
- if (LOG.isDebugEnabled()) {
- LOG.debug("NM deleting path : " + del);
- }
- try {
- lfs.delete(del, true);
- } catch (IOException e) {
- error = true;
- LOG.warn("Failed to delete " + subDir);
- }
- }
- }
- } else {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Deleting path: [" + subDir + "] as user: [" + user + "]");
- }
- if (baseDirs == null || baseDirs.size() == 0) {
- delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
- .setUser(user)
- .setSubDir(subDir)
- .build());
- } else {
- delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
- .setUser(user)
- .setSubDir(subDir)
- .setBasedirs(baseDirs.toArray(new Path[0]))
- .build());
- }
- } catch (IOException e) {
- error = true;
- LOG.warn("Failed to delete as user " + user, e);
- } catch (InterruptedException e) {
- error = true;
- LOG.warn("Failed to delete as user " + user, e);
- }
- }
- if (error) {
- setSuccess(!error);
- }
- fileDeletionTaskFinished();
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
- sb.append(" user : ").append(this.user);
- sb.append(" subDir : ").append(
- subDir == null ? "null" : subDir.toString());
- sb.append(" baseDir : ");
- if (baseDirs == null || baseDirs.size() == 0) {
- sb.append("null");
- } else {
- for (Path baseDir : baseDirs) {
- sb.append(baseDir.toString()).append(',');
- }
- }
- return sb.toString();
- }
-
- /**
- * If there is a task dependency between say tasks 1,2,3 such that
- * task2 and task3 can be started only after task1 then we should define
- * task2 and task3 as successor tasks for task1.
- * Note:- Task dependency should be defined prior to
- * @param successorTask
- */
- public synchronized void addFileDeletionTaskDependency(
- FileDeletionTask successorTask) {
- if (successorTaskSet.add(successorTask)) {
- successorTask.incrementAndGetPendingPredecessorTasks();
+ String msg = String.format("Scheduling DeletionTask (delay %d) : %s",
+ debugDelay, deletionTask.toString());
+ LOG.debug(msg);
}
+ recordDeletionTaskInStateStore(deletionTask);
+ sched.schedule(deletionTask, debugDelay, TimeUnit.SECONDS);
}
-
- /*
- * This is called when
- * 1) Current file deletion task ran and finished.
- * 2) This can be even directly called by predecessor task if one of the
- * dependent tasks of it has failed marking its success = false.
- */
- private synchronized void fileDeletionTaskFinished() {
- try {
- delService.stateStore.removeDeletionTask(taskId);
- } catch (IOException e) {
- LOG.error("Unable to remove deletion task " + taskId
- + " from state store", e);
- }
- Iterator<FileDeletionTask> successorTaskI =
- this.successorTaskSet.iterator();
- while (successorTaskI.hasNext()) {
- FileDeletionTask successorTask = successorTaskI.next();
- if (!success) {
- successorTask.setSuccess(success);
- }
- int count = successorTask.decrementAndGetPendingPredecessorTasks();
- if (count == 0) {
- if (successorTask.getSucess()) {
- successorTask.delService.scheduleFileDeletionTask(successorTask);
- } else {
- successorTask.fileDeletionTaskFinished();
- }
- }
- }
- }
- }
-
- /**
- * Helper method to create file deletion task. To be used only if we need
- * a way to define dependencies between deletion tasks.
- * @param user user on whose behalf this task is suppose to run
- * @param subDir sub directory as required in
- * {@link DeletionService#delete(String, Path, Path...)}
- * @param baseDirs base directories as required in
- * {@link DeletionService#delete(String, Path, Path...)}
- */
- public FileDeletionTask createFileDeletionTask(String user, Path subDir,
- Path[] baseDirs) {
- return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
}
- private void recover(RecoveredDeletionServiceState state)
+ private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
throws IOException {
List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
- new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
- Set<Integer> successorTasks = new HashSet<Integer>();
+ new HashMap<>(taskProtos.size());
+ Set<Integer> successorTasks = new HashSet<>();
for (DeletionServiceDeleteTaskProto proto : taskProtos) {
- DeletionTaskRecoveryInfo info = parseTaskProto(proto);
- idToInfoMap.put(info.task.taskId, info);
- nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
- successorTasks.addAll(info.successorTaskIds);
+ DeletionTaskRecoveryInfo info =
+ NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
+ idToInfoMap.put(info.getTask().getTaskId(), info);
+ nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
+ successorTasks.addAll(info.getSuccessorTaskIds());
}
// restore the task dependencies and schedule the deletion tasks that
// have no predecessors
final long now = System.currentTimeMillis();
for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
- for (Integer successorId : info.successorTaskIds){
+ for (Integer successorId : info.getSuccessorTaskIds()){
DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
if (successor != null) {
- info.task.addFileDeletionTaskDependency(successor.task);
+ info.getTask().addDeletionTaskDependency(successor.getTask());
} else {
LOG.error("Unable to locate dependency task for deletion task "
- + info.task.taskId + " at " + info.task.getSubDir());
+ + info.getTask().getTaskId());
}
}
- if (!successorTasks.contains(info.task.taskId)) {
- long msecTilDeletion = info.deletionTimestamp - now;
- sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
+ if (!successorTasks.contains(info.getTask().getTaskId())) {
+ long msecTilDeletion = info.getDeletionTimestamp() - now;
+ sched.schedule(info.getTask(), msecTilDeletion, TimeUnit.MILLISECONDS);
}
}
}
- private DeletionTaskRecoveryInfo parseTaskProto(
- DeletionServiceDeleteTaskProto proto) throws IOException {
- int taskId = proto.getId();
- String user = proto.hasUser() ? proto.getUser() : null;
- Path subdir = null;
- List<Path> basePaths = null;
- if (proto.hasSubdir()) {
- subdir = new Path(proto.getSubdir());
- }
- List<String> basedirs = proto.getBasedirsList();
- if (basedirs != null && basedirs.size() > 0) {
- basePaths = new ArrayList<Path>(basedirs.size());
- for (String basedir : basedirs) {
- basePaths.add(new Path(basedir));
- }
- }
-
- FileDeletionTask task = new FileDeletionTask(taskId, this, user,
- subdir, basePaths);
- return new DeletionTaskRecoveryInfo(task,
- proto.getSuccessorIdsList(),
- proto.getDeletionTime());
- }
-
private int generateTaskId() {
// get the next ID but avoid an invalid ID
int taskId = nextTaskId.incrementAndGet();
- while (taskId == FileDeletionTask.INVALID_TASK_ID) {
+ while (taskId == DeletionTask.INVALID_TASK_ID) {
taskId = nextTaskId.incrementAndGet();
}
return taskId;
}
- private void recordDeletionTaskInStateStore(FileDeletionTask task) {
+ private void recordDeletionTaskInStateStore(DeletionTask task) {
if (!stateStore.canRecover()) {
// optimize the case where we aren't really recording
return;
}
- if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
+ if (task.getTaskId() != DeletionTask.INVALID_TASK_ID) {
return; // task already recorded
}
- task.taskId = generateTaskId();
-
- FileDeletionTask[] successors = task.getSuccessorTasks();
+ task.setTaskId(generateTaskId());
// store successors first to ensure task IDs have been generated for them
- for (FileDeletionTask successor : successors) {
+ DeletionTask[] successors = task.getSuccessorTasks();
+ for (DeletionTask successor : successors) {
recordDeletionTaskInStateStore(successor);
}
- DeletionServiceDeleteTaskProto.Builder builder =
- DeletionServiceDeleteTaskProto.newBuilder();
- builder.setId(task.taskId);
- if (task.getUser() != null) {
- builder.setUser(task.getUser());
- }
- if (task.getSubDir() != null) {
- builder.setSubdir(task.getSubDir().toString());
- }
- builder.setDeletionTime(System.currentTimeMillis() +
- TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
- if (task.getBaseDirs() != null) {
- for (Path dir : task.getBaseDirs()) {
- builder.addBasedirs(dir.toString());
- }
- }
- for (FileDeletionTask successor : successors) {
- builder.addSuccessorIds(successor.taskId);
- }
-
try {
- stateStore.storeDeletionTask(task.taskId, builder.build());
+ stateStore.storeDeletionTask(task.getTaskId(),
+ task.convertDeletionTaskToProto());
} catch (IOException e) {
- LOG.error("Unable to store deletion task " + task.taskId + " for "
- + task.getSubDir(), e);
+ LOG.error("Unable to store deletion task " + task.getTaskId(), e);
}
}
- private static class DeletionTaskRecoveryInfo {
- FileDeletionTask task;
- List<Integer> successorTaskIds;
- long deletionTimestamp;
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("DeletionService #%d")
+ .build();
+ if (conf != null) {
+ sched = new HadoopScheduledThreadPoolExecutor(
+ conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
+ debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
+ } else {
+ sched = new HadoopScheduledThreadPoolExecutor(
+ YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
+ }
+ sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ sched.setKeepAliveTime(60L, SECONDS);
+ if (stateStore.canRecover()) {
+ recover(stateStore.loadDeletionServiceState());
+ }
+ super.serviceInit(conf);
+ }
- public DeletionTaskRecoveryInfo(FileDeletionTask task,
- List<Integer> successorTaskIds, long deletionTimestamp) {
- this.task = task;
- this.successorTaskIds = successorTaskIds;
- this.deletionTimestamp = deletionTimestamp;
+ @Override
+ public void serviceStop() throws Exception {
+ if (sched != null) {
+ sched.shutdown();
+ boolean terminated = false;
+ try {
+ terminated = sched.awaitTermination(10, SECONDS);
+ } catch (InterruptedException e) { }
+ if (!terminated) {
+ sched.shutdownNow();
+ }
}
+ super.serviceStop();
+ }
+
+ /**
+ * Determine if the service has completely stopped.
+ * Used only by unit tests
+ * @return true if service has completely stopped
+ */
+ @Private
+ public boolean isTerminated() {
+ return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java
new file mode 100644
index 0000000..e47b3ee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities for converting from PB representations.
+ */
+public final class NMProtoUtils {
+
+ private static final Log LOG = LogFactory.getLog(NMProtoUtils.class);
+
+ private NMProtoUtils() { }
+
+ /**
+ * Convert the Protobuf representation into a {@link DeletionTask}.
+ *
+ * @param proto the Protobuf representation for the DeletionTask
+ * @param deletionService the {@link DeletionService}
+ * @return the converted {@link DeletionTask}
+ */
+ public static DeletionTask convertProtoToDeletionTask(
+ DeletionServiceDeleteTaskProto proto, DeletionService deletionService) {
+ int taskId = proto.getId();
+ if (proto.hasTaskType() && proto.getTaskType() != null) {
+ if (proto.getTaskType().equals(DeletionTaskType.FILE.name())) {
+ LOG.debug("Converting recovered FileDeletionTask");
+ return convertProtoToFileDeletionTask(proto, deletionService, taskId);
+ }
+ }
+ LOG.debug("Unable to get task type, trying FileDeletionTask");
+ return convertProtoToFileDeletionTask(proto, deletionService, taskId);
+ }
+
+ /**
+ * Convert the Protobuf representation into the {@link FileDeletionTask}.
+ *
+ * @param proto the Protobuf representation of the {@link FileDeletionTask}
+ * @param deletionService the {@link DeletionService}.
+ * @param taskId the ID of the {@link DeletionTask}.
+ * @return the populated {@link FileDeletionTask}.
+ */
+ public static FileDeletionTask convertProtoToFileDeletionTask(
+ DeletionServiceDeleteTaskProto proto, DeletionService deletionService,
+ int taskId) {
+ String user = proto.hasUser() ? proto.getUser() : null;
+ Path subdir = null;
+ if (proto.hasSubdir()) {
+ subdir = new Path(proto.getSubdir());
+ }
+ List<Path> basePaths = null;
+ List<String> basedirs = proto.getBasedirsList();
+ if (basedirs != null && basedirs.size() > 0) {
+ basePaths = new ArrayList<>(basedirs.size());
+ for (String basedir : basedirs) {
+ basePaths.add(new Path(basedir));
+ }
+ }
+ return new FileDeletionTask(taskId, deletionService, user, subdir,
+ basePaths);
+ }
+
+ /**
+ * Convert the Protobuf representation to the {@link DeletionTaskRecoveryInfo}
+ * representation.
+ *
+ * @param proto the Protobuf representation of the {@link DeletionTask}
+ * @param deletionService the {@link DeletionService}
+ * @return the populated {@link DeletionTaskRecoveryInfo}
+ */
+ public static DeletionTaskRecoveryInfo convertProtoToDeletionTaskRecoveryInfo(
+ DeletionServiceDeleteTaskProto proto, DeletionService deletionService) {
+ DeletionTask deletionTask =
+ NMProtoUtils.convertProtoToDeletionTask(proto, deletionService);
+ List<Integer> successorTaskIds = new ArrayList<>();
+ if (proto.getSuccessorIdsList() != null &&
+ !proto.getSuccessorIdsList().isEmpty()) {
+ successorTaskIds = proto.getSuccessorIdsList();
+ }
+ long deletionTimestamp = proto.getDeletionTime();
+ return new DeletionTaskRecoveryInfo(deletionTask, successorTaskIds,
+ deletionTimestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java
new file mode 100644
index 0000000..006f49f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package containing classes for working with Protobuf.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java
new file mode 100644
index 0000000..c62ea02
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
+
+import java.util.List;
+
+/**
+ * Encapsulates the recovery info needed to recover a DeletionTask from the NM
+ * state store.
+ */
+public class DeletionTaskRecoveryInfo {
+
+ private DeletionTask task;
+ private List<Integer> successorTaskIds;
+ private long deletionTimestamp;
+
+ /**
+ * Information needed for recovering the DeletionTask.
+ *
+ * @param task the DeletionTask
+ * @param successorTaskIds the dependent DeletionTasks.
+ * @param deletionTimestamp the scheduled times of deletion.
+ */
+ public DeletionTaskRecoveryInfo(DeletionTask task,
+ List<Integer> successorTaskIds, long deletionTimestamp) {
+ this.task = task;
+ this.successorTaskIds = successorTaskIds;
+ this.deletionTimestamp = deletionTimestamp;
+ }
+
+ /**
+ * Return the recovered DeletionTask.
+ *
+ * @return the recovered DeletionTask.
+ */
+ public DeletionTask getTask() {
+ return task;
+ }
+
+ /**
+ * Return all of the dependent DeletionTasks.
+ *
+ * @return the dependent DeletionTasks.
+ */
+ public List<Integer> getSuccessorTaskIds() {
+ return successorTaskIds;
+ }
+
+ /**
+ * Return the deletion timestamp.
+ *
+ * @return the deletion timestamp.
+ */
+ public long getDeletionTimestamp() {
+ return deletionTimestamp;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java
new file mode 100644
index 0000000..28d7f62
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package containing classes for recovering DeletionTasks.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java
new file mode 100644
index 0000000..635d7a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * DeletionTasks are supplied to the {@link DeletionService} for deletion.
+ */
+public abstract class DeletionTask implements Runnable {
+
+ static final Log LOG = LogFactory.getLog(DeletionTask.class);
+
+ public static final int INVALID_TASK_ID = -1;
+
+ private int taskId;
+ private String user;
+ private DeletionTaskType deletionTaskType;
+ private DeletionService deletionService;
+ private final AtomicInteger numberOfPendingPredecessorTasks;
+ private final Set<DeletionTask> successorTaskSet;
+ // By default all tasks will start as success=true; however if any of
+ // the dependent task fails then it will be marked as false in
+ // deletionTaskFinished().
+ private boolean success;
+
+ /**
+ * Deletion task with taskId and default values.
+ *
+ * @param taskId the ID of the task, if previously set.
+ * @param deletionService the {@link DeletionService}.
+ * @param user the user associated with the delete.
+ * @param deletionTaskType the {@link DeletionTaskType}.
+ */
+ public DeletionTask(int taskId, DeletionService deletionService, String user,
+ DeletionTaskType deletionTaskType) {
+ this(taskId, deletionService, user, new AtomicInteger(0),
+ new HashSet<DeletionTask>(), deletionTaskType);
+ }
+
+ /**
+ * Deletion task with taskId and user supplied values.
+ *
+ * @param taskId the ID of the task, if previously set.
+ * @param deletionService the {@link DeletionService}.
+ * @param user the user associated with the delete.
+ * @param numberOfPendingPredecessorTasks Number of pending tasks.
+ * @param successorTaskSet the list of successor DeletionTasks
+ * @param deletionTaskType the {@link DeletionTaskType}.
+ */
+ public DeletionTask(int taskId, DeletionService deletionService, String user,
+ AtomicInteger numberOfPendingPredecessorTasks,
+ Set<DeletionTask> successorTaskSet, DeletionTaskType deletionTaskType) {
+ this.taskId = taskId;
+ this.deletionService = deletionService;
+ this.user = user;
+ this.numberOfPendingPredecessorTasks = numberOfPendingPredecessorTasks;
+ this.successorTaskSet = successorTaskSet;
+ this.deletionTaskType = deletionTaskType;
+ success = true;
+ }
+
+ /**
+ * Get the taskId for the DeletionTask.
+ *
+ * @return the taskId.
+ */
+ public int getTaskId() {
+ return taskId;
+ }
+
+ /**
+ * Set the taskId for the DeletionTask.
+ *
+ * @param taskId the taskId.
+ */
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+
+ /**
+ * The the user assoicated with the DeletionTask.
+ *
+ * @return the user name.
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * Get the {@link DeletionService} for this DeletionTask.
+ *
+ * @return the {@link DeletionService}.
+ */
+ public DeletionService getDeletionService() {
+ return deletionService;
+ }
+
+ /**
+ * Get the {@link DeletionTaskType} for this DeletionTask.
+ *
+ * @return the {@link DeletionTaskType}.
+ */
+ public DeletionTaskType getDeletionTaskType() {
+ return deletionTaskType;
+ }
+
+ /**
+ * Set the DeletionTask run status.
+ *
+ * @param success the status of the running DeletionTask.
+ */
+ public synchronized void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ /**
+ * Return the DeletionTask run status.
+ *
+ * @return the status of the running DeletionTask.
+ */
+ public synchronized boolean getSucess() {
+ return this.success;
+ }
+
+ /**
+ * Return the list of successor tasks for the DeletionTask.
+ *
+ * @return the list of successor tasks.
+ */
+ public synchronized DeletionTask[] getSuccessorTasks() {
+ DeletionTask[] successors = new DeletionTask[successorTaskSet.size()];
+ return successorTaskSet.toArray(successors);
+ }
+
+ /**
+ * Convert the DeletionTask to the Protobuf representation for storing in the
+ * state store and recovery.
+ *
+ * @return the protobuf representation of the DeletionTask.
+ */
+ public abstract DeletionServiceDeleteTaskProto convertDeletionTaskToProto();
+
+ /**
+ * Add a dependent DeletionTask.
+ *
+ * If there is a task dependency between say tasks 1,2,3 such that
+ * task2 and task3 can be started only after task1 then we should define
+ * task2 and task3 as successor tasks for task1.
+ * Note:- Task dependency should be defined prior to calling delete.
+ *
+ * @param successorTask the DeletionTask the depends on this DeletionTask.
+ */
+ public synchronized void addDeletionTaskDependency(
+ DeletionTask successorTask) {
+ if (successorTaskSet.add(successorTask)) {
+ successorTask.incrementAndGetPendingPredecessorTasks();
+ }
+ }
+
+ /**
+ * Increments and returns pending predecessor task count.
+ *
+ * @return the number of pending predecessor DeletionTasks.
+ */
+ public int incrementAndGetPendingPredecessorTasks() {
+ return numberOfPendingPredecessorTasks.incrementAndGet();
+ }
+
+ /**
+ * Decrements and returns pending predecessor task count.
+ *
+ * @return the number of pending predecessor DeletionTasks.
+ */
+ public int decrementAndGetPendingPredecessorTasks() {
+ return numberOfPendingPredecessorTasks.decrementAndGet();
+ }
+
+ /**
+ * Removes the DeletionTask from the state store and validates that successor
+ * tasks have been scheduled and completed.
+ *
+ * This is called when:
+ * 1) Current deletion task ran and finished.
+ * 2) When directly called by predecessor task if one of the
+ * dependent tasks of it has failed marking its success = false.
+ */
+ synchronized void deletionTaskFinished() {
+ try {
+ NMStateStoreService stateStore = deletionService.getStateStore();
+ stateStore.removeDeletionTask(taskId);
+ } catch (IOException e) {
+ LOG.error("Unable to remove deletion task " + taskId
+ + " from state store", e);
+ }
+ Iterator<DeletionTask> successorTaskI = this.successorTaskSet.iterator();
+ while (successorTaskI.hasNext()) {
+ DeletionTask successorTask = successorTaskI.next();
+ if (!success) {
+ successorTask.setSuccess(success);
+ }
+ int count = successorTask.decrementAndGetPendingPredecessorTasks();
+ if (count == 0) {
+ if (successorTask.getSucess()) {
+ successorTask.deletionService.delete(successorTask);
+ } else {
+ successorTask.deletionTaskFinished();
+ }
+ }
+ }
+ }
+
+ /**
+ * Return the Protobuf builder with the base DeletionTask attributes.
+ *
+ * @return pre-populated Buidler with the base attributes.
+ */
+ DeletionServiceDeleteTaskProto.Builder getBaseDeletionTaskProtoBuilder() {
+ DeletionServiceDeleteTaskProto.Builder builder =
+ DeletionServiceDeleteTaskProto.newBuilder();
+ builder.setId(getTaskId());
+ if (getUser() != null) {
+ builder.setUser(getUser());
+ }
+ builder.setDeletionTime(System.currentTimeMillis() +
+ TimeUnit.MILLISECONDS.convert(getDeletionService().getDebugDelay(),
+ TimeUnit.SECONDS));
+ for (DeletionTask successor : getSuccessorTasks()) {
+ builder.addSuccessorIds(successor.getTaskId());
+ }
+ return builder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java
new file mode 100644
index 0000000..676c71b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+/**
+ * Available types of {@link DeletionTask}s.
+ */
+public enum DeletionTaskType {
+ FILE
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java
new file mode 100644
index 0000000..fd07f16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link DeletionTask} handling the removal of files (and directories).
+ */
+public class FileDeletionTask extends DeletionTask implements Runnable {
+
+ private final Path subDir;
+ private final List<Path> baseDirs;
+ private static final FileContext lfs = getLfs();
+
+ private static FileContext getLfs() {
+ try {
+ return FileContext.getLocalFSFileContext();
+ } catch (UnsupportedFileSystemException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Construct a FileDeletionTask with the default INVALID_TASK_ID.
+ *
+ * @param deletionService the {@link DeletionService}.
+ * @param user the user deleting the file.
+ * @param subDir the subdirectory to delete.
+ * @param baseDirs the base directories containing the subdir.
+ */
+ public FileDeletionTask(DeletionService deletionService, String user,
+ Path subDir, List<Path> baseDirs) {
+ this(INVALID_TASK_ID, deletionService, user, subDir, baseDirs);
+ }
+
+ /**
+ * Construct a FileDeletionTask with the default INVALID_TASK_ID.
+ *
+ * @param taskId the ID of the task, if previously set.
+ * @param deletionService the {@link DeletionService}.
+ * @param user the user deleting the file.
+ * @param subDir the subdirectory to delete.
+ * @param baseDirs the base directories containing the subdir.
+ */
+ public FileDeletionTask(int taskId, DeletionService deletionService,
+ String user, Path subDir, List<Path> baseDirs) {
+ super(taskId, deletionService, user, DeletionTaskType.FILE);
+ this.subDir = subDir;
+ this.baseDirs = baseDirs;
+ }
+
+ /**
+ * Get the subdirectory to delete.
+ *
+ * @return the subDir for the FileDeletionTask.
+ */
+ public Path getSubDir() {
+ return this.subDir;
+ }
+
+ /**
+ * Get the base directories containing the subdirectory.
+ *
+ * @return the base directories for the FileDeletionTask.
+ */
+ public List<Path> getBaseDirs() {
+ return this.baseDirs;
+ }
+
+ /**
+ * Delete the specified file/directory as the specified user.
+ */
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ String msg = String.format("Running DeletionTask : %s", toString());
+ LOG.debug(msg);
+ }
+ boolean error = false;
+ if (null == getUser()) {
+ if (baseDirs == null || baseDirs.size() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NM deleting absolute path : " + subDir);
+ }
+ try {
+ lfs.delete(subDir, true);
+ } catch (IOException e) {
+ error = true;
+ LOG.warn("Failed to delete " + subDir);
+ }
+ } else {
+ for (Path baseDir : baseDirs) {
+ Path del = subDir == null? baseDir : new Path(baseDir, subDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NM deleting path : " + del);
+ }
+ try {
+ lfs.delete(del, true);
+ } catch (IOException e) {
+ error = true;
+ LOG.warn("Failed to delete " + subDir);
+ }
+ }
+ }
+ } else {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Deleting path: [" + subDir + "] as user: [" + getUser() + "]");
+ }
+ if (baseDirs == null || baseDirs.size() == 0) {
+ getDeletionService().getContainerExecutor().deleteAsUser(
+ new DeletionAsUserContext.Builder()
+ .setUser(getUser())
+ .setSubDir(subDir)
+ .build());
+ } else {
+ getDeletionService().getContainerExecutor().deleteAsUser(
+ new DeletionAsUserContext.Builder()
+ .setUser(getUser())
+ .setSubDir(subDir)
+ .setBasedirs(baseDirs.toArray(new Path[0]))
+ .build());
+ }
+ } catch (IOException|InterruptedException e) {
+ error = true;
+ LOG.warn("Failed to delete as user " + getUser(), e);
+ }
+ }
+ if (error) {
+ setSuccess(!error);
+ }
+ deletionTaskFinished();
+ }
+
+ /**
+ * Convert the FileDeletionTask to a String representation.
+ *
+ * @return String representation of the FileDeletionTask.
+ */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("FileDeletionTask :");
+ sb.append(" id : ").append(getTaskId());
+ sb.append(" user : ").append(getUser());
+ sb.append(" subDir : ").append(
+ subDir == null ? "null" : subDir.toString());
+ sb.append(" baseDir : ");
+ if (baseDirs == null || baseDirs.size() == 0) {
+ sb.append("null");
+ } else {
+ for (Path baseDir : baseDirs) {
+ sb.append(baseDir.toString()).append(',');
+ }
+ }
+ return sb.toString().trim();
+ }
+
+ /**
+ * Convert the FileDeletionTask to the Protobuf representation for storing
+ * in the state store and recovery.
+ *
+ * @return the protobuf representation of the FileDeletionTask.
+ */
+ public DeletionServiceDeleteTaskProto convertDeletionTaskToProto() {
+ DeletionServiceDeleteTaskProto.Builder builder =
+ getBaseDeletionTaskProtoBuilder();
+ builder.setTaskType(DeletionTaskType.FILE.name());
+ if (getSubDir() != null) {
+ builder.setSubdir(getSubDir().toString());
+ }
+ if (getBaseDirs() != null) {
+ for (Path dir : getBaseDirs()) {
+ builder.addBasedirs(dir.toString());
+ }
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java
new file mode 100644
index 0000000..f1a3985
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package containing DeletionTasks for use with the DeletionService.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index af34e92..47e6a55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
@@ -113,9 +114,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
if (this.useLocalCacheDirectoryManager) {
directoryManagers =
- new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
+ new ConcurrentHashMap<>();
inProgressLocalResourcesMap =
- new ConcurrentHashMap<LocalResourceRequest, Path>();
+ new ConcurrentHashMap<>();
}
this.conf = conf;
this.stateStore = stateStore;
@@ -393,7 +394,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
return false;
} else { // ResourceState is LOCALIZED or INIT
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
- delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
+ FileDeletionTask deletionTask = new FileDeletionTask(delService,
+ getUser(), getPathToDelete(rsrc.getLocalPath()), null);
+ delService.delete(deletionTask);
}
removeResource(rem.getRequest());
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
@@ -488,7 +491,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
LOG.warn("Directory " + uniquePath + " already exists, " +
"try next one.");
if (delService != null) {
- delService.delete(getUser(), uniquePath);
+ FileDeletionTask deletionTask = new FileDeletionTask(delService,
+ getUser(), uniquePath, null);
+ delService.delete(deletionTask);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 663bad7..5bc0da7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -95,7 +95,6 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
@@ -113,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
@@ -604,7 +604,9 @@ public class ResourceLocalizationService extends CompositeService
private void submitDirForDeletion(String userName, Path dir) {
try {
lfs.getFileStatus(dir);
- delService.delete(userName, dir, new Path[] {});
+ FileDeletionTask deletionTask = new FileDeletionTask(delService, userName,
+ dir, null);
+ delService.delete(deletionTask);
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
} catch (IOException ie) {
@@ -1234,10 +1236,13 @@ public class ResourceLocalizationService extends CompositeService
event.getResource().unlock();
}
if (!paths.isEmpty()) {
- delService.delete(context.getUser(),
- null, paths.toArray(new Path[paths.size()]));
+ FileDeletionTask deletionTask = new FileDeletionTask(delService,
+ context.getUser(), null, paths);
+ delService.delete(deletionTask);
}
- delService.delete(null, nmPrivateCTokensPath, new Path[] {});
+ FileDeletionTask deletionTask = new FileDeletionTask(delService, null,
+ nmPrivateCTokensPath, null);
+ delService.delete(deletionTask);
}
}
@@ -1456,7 +1461,9 @@ public class ResourceLocalizationService extends CompositeService
String appName = fileStatus.getPath().getName();
if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) {
LOG.info("delete app log dir," + appName);
- del.delete(null, fileStatus.getPath());
+ FileDeletionTask deletionTask = new FileDeletionTask(del, null,
+ fileStatus.getPath(), null);
+ del.delete(deletionTask);
}
}
}
@@ -1516,7 +1523,9 @@ public class ResourceLocalizationService extends CompositeService
||
status.getPath().getName()
.matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
- del.delete(null, status.getPath(), new Path[] {});
+ FileDeletionTask deletionTask = new FileDeletionTask(del, null,
+ status.getPath(), null);
+ del.delete(deletionTask);
}
} catch (IOException ex) {
// Do nothing, just give the warning
@@ -1530,24 +1539,25 @@ public class ResourceLocalizationService extends CompositeService
private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
Path userDirPath) throws IOException {
RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
- FileDeletionTask dependentDeletionTask =
- del.createFileDeletionTask(null, userDirPath, new Path[] {});
+ FileDeletionTask dependentDeletionTask = new FileDeletionTask(del, null,
+ userDirPath, new ArrayList<Path>());
if (userDirStatus != null && userDirStatus.hasNext()) {
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
while (userDirStatus.hasNext()) {
FileStatus status = userDirStatus.next();
String owner = status.getOwner();
- FileDeletionTask deletionTask =
- del.createFileDeletionTask(owner, null,
- new Path[] { status.getPath() });
- deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ List<Path> pathList = new ArrayList<>();
+ pathList.add(status.getPath());
+ FileDeletionTask deletionTask = new FileDeletionTask(del, owner, null,
+ pathList);
+ deletionTask.addDeletionTaskDependency(dependentDeletionTask);
deletionTasks.add(deletionTask);
}
for (FileDeletionTask task : deletionTasks) {
- del.scheduleFileDeletionTask(task);
+ del.delete(task);
}
} else {
- del.scheduleFileDeletionTask(dependentDeletionTask);
+ del.delete(dependentDeletionTask);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index f465534..0d9e686 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
@@ -258,19 +260,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
return;
}
- if (UserGroupInformation.isSecurityEnabled()) {
- Credentials systemCredentials =
- context.getSystemCredentialsForApps().get(appId);
- if (systemCredentials != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding new framework-token for " + appId
- + " for log-aggregation: " + systemCredentials.getAllTokens()
- + "; userUgi=" + userUgi);
- }
- // this will replace old token
- userUgi.addCredentials(systemCredentials);
- }
- }
+ addCredentials();
// Create a set of Containers whose logs will be uploaded in this cycle.
// It includes:
@@ -332,9 +322,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
- this.delService.delete(this.userUgi.getShortUserName(), null,
- uploadedFilePathsInThisCycle
- .toArray(new Path[uploadedFilePathsInThisCycle.size()]));
+ List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
+ uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
+ DeletionTask deletionTask = new FileDeletionTask(delService,
+ this.userUgi.getShortUserName(), null,
+ uploadedFilePathsInThisCycleList);
+ delService.delete(deletionTask);
}
// This container is finished, and all its logs have been uploaded,
@@ -352,11 +345,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
long currentTime = System.currentTimeMillis();
- final Path renamedPath = this.rollingMonitorInterval <= 0
- ? remoteNodeLogFileForApp : new Path(
- remoteNodeLogFileForApp.getParent(),
- remoteNodeLogFileForApp.getName() + "_"
- + currentTime);
+ final Path renamedPath = getRenamedPath(currentTime);
final boolean rename = uploadedLogsInThisCycle;
try {
@@ -396,6 +385,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
}
+ private Path getRenamedPath(long currentTime) {
+ return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp
+ : new Path(remoteNodeLogFileForApp.getParent(),
+ remoteNodeLogFileForApp.getName() + "_" + currentTime);
+ }
+
+ private void addCredentials() {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials systemCredentials =
+ context.getSystemCredentialsForApps().get(appId);
+ if (systemCredentials != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding new framework-token for " + appId
+ + " for log-aggregation: " + systemCredentials.getAllTokens()
+ + "; userUgi=" + userUgi);
+ }
+ // this will replace old token
+ userUgi.addCredentials(systemCredentials);
+ }
+ }
+ }
+
@VisibleForTesting
protected LogWriter createLogWriter() {
return new LogWriter();
@@ -561,8 +572,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
if (localAppLogDirs.size() > 0) {
- this.delService.delete(this.userUgi.getShortUserName(), null,
- localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
+ List<Path> localAppLogDirsList = new ArrayList<>();
+ localAppLogDirsList.addAll(localAppLogDirs);
+ DeletionTask deletionTask = new FileDeletionTask(delService,
+ this.userUgi.getShortUserName(), null, localAppLogDirsList);
+ this.delService.delete(deletionTask);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 2901743..9961748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
@@ -247,8 +248,10 @@ public class NonAggregatingLogHandler extends AbstractService implements
new ApplicationEvent(this.applicationId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
if (localAppLogDirs.size() > 0) {
- NonAggregatingLogHandler.this.delService.delete(user, null,
- (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
+ FileDeletionTask deletionTask = new FileDeletionTask(
+ NonAggregatingLogHandler.this.delService, user, null,
+ localAppLogDirs);
+ NonAggregatingLogHandler.this.delService.delete(deletionTask);
}
try {
NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index 7831711..7212953 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -41,6 +41,7 @@ message DeletionServiceDeleteTaskProto {
optional int64 deletionTime = 4;
repeated string basedirs = 5;
repeated int32 successorIds = 6;
+ optional string taskType = 7;
}
message LocalizedResourceProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
index 2e0bbe0..87f4a1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
@@ -33,13 +33,14 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;
+
public class TestDeletionService {
private static final FileContext lfs = getLfs();
@@ -123,8 +124,9 @@ public class TestDeletionService {
del.start();
try {
for (Path p : dirs) {
- del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
- p, null);
+ FileDeletionTask deletionTask = new FileDeletionTask(del,
+ (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null);
+ del.delete(deletionTask);
}
int msecToWait = 20 * 1000;
@@ -159,8 +161,10 @@ public class TestDeletionService {
del.start();
for (Path p : content) {
assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
- del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
- p, baseDirs.toArray(new Path[4]));
+ FileDeletionTask deletionTask = new FileDeletionTask(del,
+ (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
+ baseDirs);
+ del.delete(deletionTask);
}
int msecToWait = 20 * 1000;
@@ -196,8 +200,9 @@ public class TestDeletionService {
del.init(conf);
del.start();
for (Path p : dirs) {
- del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
- null);
+ FileDeletionTask deletionTask = new FileDeletionTask(del,
+ (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null);
+ del.delete(deletionTask);
}
int msecToWait = 20 * 1000;
for (Path p : dirs) {
@@ -220,7 +225,9 @@ public class TestDeletionService {
try {
del.init(conf);
del.start();
- del.delete("dingo", new Path("/does/not/exist"));
+ FileDeletionTask deletionTask = new FileDeletionTask(del, "dingo",
+ new Path("/does/not/exist"), null);
+ del.delete(deletionTask);
} finally {
del.stop();
}
@@ -247,18 +254,20 @@ public class TestDeletionService {
// first we will try to delete sub directories which are present. This
// should then trigger parent directory to be deleted.
List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
-
+
FileDeletionTask dependentDeletionTask =
- del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
+ new FileDeletionTask(del, null, dirs.get(0), new ArrayList<Path>());
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
for (Path subDir : subDirs) {
+ List<Path> subDirList = new ArrayList<>();
+ subDirList.add(subDir);
FileDeletionTask deletionTask =
- del.createFileDeletionTask(null, null, new Path[] { subDir });
- deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ new FileDeletionTask(del, null, dirs.get(0), subDirList);
+ deletionTask.addDeletionTaskDependency(dependentDeletionTask);
deletionTasks.add(deletionTask);
}
for (FileDeletionTask task : deletionTasks) {
- del.scheduleFileDeletionTask(task);
+ del.delete(task);
}
int msecToWait = 20 * 1000;
@@ -274,19 +283,21 @@ public class TestDeletionService {
subDirs = buildDirs(r, dirs.get(1), 2);
subDirs.add(new Path(dirs.get(1), "absentFile"));
- dependentDeletionTask =
- del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
+ dependentDeletionTask = new FileDeletionTask(del, null, dirs.get(1),
+ new ArrayList<Path>());
deletionTasks = new ArrayList<FileDeletionTask>();
for (Path subDir : subDirs) {
- FileDeletionTask deletionTask =
- del.createFileDeletionTask(null, null, new Path[] { subDir });
- deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
+ List<Path> subDirList = new ArrayList<>();
+ subDirList.add(subDir);
+ FileDeletionTask deletionTask = new FileDeletionTask(del, null, null,
+ subDirList);
+ deletionTask.addDeletionTaskDependency(dependentDeletionTask);
deletionTasks.add(deletionTask);
}
// marking one of the tasks as a failure.
deletionTasks.get(2).setSuccess(false);
for (FileDeletionTask task : deletionTasks) {
- del.scheduleFileDeletionTask(task);
+ del.delete(task);
}
msecToWait = 20 * 1000;
@@ -327,8 +338,10 @@ public class TestDeletionService {
del.start();
for (Path p : content) {
assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
- del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
- p, baseDirs.toArray(new Path[4]));
+ FileDeletionTask deletionTask = new FileDeletionTask(del,
+ (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
+ baseDirs);
+ del.delete(deletionTask);
}
// restart the deletion service
@@ -341,8 +354,10 @@ public class TestDeletionService {
// verify paths are still eventually deleted
int msecToWait = 10 * 1000;
for (Path p : baseDirs) {
+ System.out.println("TEST Basedir: " + p.getName());
for (Path q : content) {
Path fp = new Path(p, q);
+ System.out.println("TEST Path: " + fp.toString());
while (msecToWait > 0 && lfs.util().exists(fp)) {
Thread.sleep(100);
msecToWait -= 100;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org