You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:02:02 UTC
svn commit: r1077308 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
c++/task-controller/ mapred/org/apache/hadoop/mapred/
test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 04:02:01 2011
New Revision: 1077308
URL: http://svn.apache.org/viewvc?rev=1077308&view=rev
Log:
commit ac80d37c69e9f8f8669a1edfd65d42feca0fedb3
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date: Wed Mar 10 15:17:17 2010 +0530
MAPREDUCE:1422 from https://issues.apache.org/jira/secure/attachment/12438393/mapreduce-1422-y20s.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1422. Fix cleanup of localized job directory to work if files
+ with non-deletable permissions are created within it.
+ (Amar Kamat via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar 4 04:02:01 2011
@@ -228,6 +228,11 @@ int main(int argc, char **argv) {
exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id,
dir_to_be_deleted);
break;
+ case ENABLE_JOB_FOR_CLEANUP:
+ tt_root = argv[optind++];
+ job_id = argv[optind++];
+ exit_code = enable_job_for_cleanup(tt_root, user_detail->pw_name, job_id);
+ break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar 4 04:02:01 2011
@@ -1053,25 +1053,24 @@ int kill_user_task(const char *user, con
* Before changing permissions, makes sure that the given path doesn't contain
* any relative components.
* tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
- * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ * full_path : is either jobLocalDir, taskDir OR taskWorkDir that is to be
+ * deleted
*/
-int enable_task_for_cleanup(const char *tt_root, const char *user,
- const char *jobid, const char *dir_to_be_deleted) {
+static int enable_path_for_cleanup(const char *tt_root, const char *user,
+ char *full_path) {
int exit_code = 0;
gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
- char * full_path = NULL;
if (check_tt_root(tt_root) < 0) {
fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
cleanup();
return INVALID_TT_ROOT;
}
- full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted);
if (full_path == NULL) {
fprintf(LOGFILE,
"Could not build the full path. Not deleting the dir %s\n",
- dir_to_be_deleted);
+ full_path);
exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
}
// Make sure that the path given is not having any relative components
@@ -1101,3 +1100,26 @@ int enable_task_for_cleanup(const char *
cleanup();
return exit_code;
}
+
+/**
+ * Enables the task work-dir/local-dir path for deletion.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ */
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+ const char *jobid, const char *dir_to_be_deleted) {
+ char *full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted);
+ return enable_path_for_cleanup(tt_root, user, full_path);
+}
+
+/**
+ * Enables the jobLocalDir for deletion.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * user : owner of the job
+ * jobid : id of the job for which the cleanup is needed.
+ */
+int enable_job_for_cleanup(const char *tt_root, const char *user,
+ const char *jobid) {
+ char *full_path = get_job_directory(tt_root, user, jobid);
+ return enable_path_for_cleanup(tt_root, user, full_path);
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar 4 04:02:01 2011
@@ -44,7 +44,8 @@ enum command {
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
- ENABLE_TASK_FOR_CLEANUP
+ ENABLE_TASK_FOR_CLEANUP,
+ ENABLE_JOB_FOR_CLEANUP
};
enum errorcodes {
@@ -120,6 +121,9 @@ int kill_user_task(const char *user, con
int enable_task_for_cleanup(const char *tt_root, const char *user,
const char *jobid, const char *dir_to_be_deleted);
+int enable_job_for_cleanup(const char *tt_root, const char *user,
+ const char *jobid);
+
int prepare_attempt_directory(const char *attempt_dir, const char *user);
// The following functions are exposed for testing
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar 4 04:02:01 2011
@@ -143,6 +143,25 @@ public class DefaultTaskController exten
@Override
void enableTaskForCleanup(PathDeletionContext context)
throws IOException {
+ enablePathForCleanup(context);
+ }
+
+ /**
+ * Enables the job for cleanup by changing permissions of the specified path
+ * in the local filesystem
+ */
+ @Override
+ void enableJobForCleanup(PathDeletionContext context)
+ throws IOException {
+ enablePathForCleanup(context);
+ }
+
+ /**
+ * Enables the path for cleanup by changing permissions of the specified path
+ * in the local filesystem
+ */
+ private void enablePathForCleanup(PathDeletionContext context)
+ throws IOException {
try {
FileUtil.chmod(context.fullPath, "u+rwx", true);
} catch(InterruptedException e) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar 4 04:02:01 2011
@@ -152,7 +152,7 @@ class JvmManager {
*/
static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
tracker.getCleanupThread().addToQueue(
- TaskTracker.buildTaskControllerPathDeletionContexts(
+ TaskTracker.buildTaskControllerTaskPathDeletionContexts(
tracker.getLocalFileSystem(),
tracker.getLocalFiles(tracker.getJobConf(), ""),
task, true /* workDir */,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 04:02:01 2011
@@ -29,8 +29,10 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
@@ -49,7 +51,7 @@ import org.apache.hadoop.util.Shell.Shel
* <p>task-controller user-name command command-args, where</p>
* <p>user-name is the name of the owner who submits the job</p>
* <p>command is one of the cardinal value of the
- * {@link LinuxTaskController.TaskCommands} enumeration</p>
+ * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
* <p>command-args depends on the command being launched.</p>
*
* In addition to running and killing tasks, the class also
@@ -83,7 +85,7 @@ class LinuxTaskController extends TaskCo
/**
* List of commands that the setuid script will execute.
*/
- enum TaskCommands {
+ enum TaskControllerCommands {
INITIALIZE_USER,
INITIALIZE_JOB,
INITIALIZE_DISTRIBUTEDCACHE_FILE,
@@ -91,7 +93,8 @@ class LinuxTaskController extends TaskCo
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
- ENABLE_TASK_FOR_CLEANUP
+ ENABLE_TASK_FOR_CLEANUP,
+ ENABLE_JOB_FOR_CLEANUP
}
@Override
@@ -152,7 +155,7 @@ class LinuxTaskController extends TaskCo
// Call the taskcontroller with the right parameters.
List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
ShellCommandExecutor shExec = buildTaskControllerExecutor(
- TaskCommands.LAUNCH_TASK_JVM,
+ TaskControllerCommands.LAUNCH_TASK_JVM,
env.conf.getUser(),
launchTaskJVMArgs, env.workDir, env.env);
context.shExec = shExec;
@@ -181,41 +184,42 @@ class LinuxTaskController extends TaskCo
/**
* Helper method that runs a LinuxTaskController command
*
- * @param taskCommand
+ * @param taskControllerCommand
* @param user
* @param cmdArgs
* @param env
* @throws IOException
*/
- private void runCommand(TaskCommands taskCommand, String user,
- List<String> cmdArgs, File workDir, Map<String, String> env)
+ private void runCommand(TaskControllerCommands taskControllerCommand,
+ String user, List<String> cmdArgs, File workDir, Map<String, String> env)
throws IOException {
ShellCommandExecutor shExec =
- buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+ buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs,
+ workDir, env);
try {
shExec.execute();
} catch (Exception e) {
- LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+ LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
+ shExec.getExitCode());
- LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+ LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
+ StringUtils.stringifyException(e));
- LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
- + " follows:");
+ LOG.info("Output from LinuxTaskController's "
+ + taskControllerCommand.toString() + " follows:");
logOutput(shExec.getOutput());
throw new IOException(e);
}
if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
- + " follows:");
+ LOG.info("Output from LinuxTaskController's "
+ + taskControllerCommand.toString() + " follows:");
logOutput(shExec.getOutput());
}
}
/**
* Returns list of arguments to be passed while initializing a new task. See
- * {@code buildTaskControllerExecutor(TaskCommands, String, List<String>,
- * JvmEnv)} documentation.
+ * {@code buildTaskControllerExecutor(TaskControllerCommands, String,
+ * List<String>, JvmEnv)} documentation.
*
* @param context
* @return Argument to be used while launching Task VM
@@ -237,10 +241,12 @@ class LinuxTaskController extends TaskCo
void initializeTask(TaskControllerContext context)
throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+ LOG.debug("Going to do "
+ + TaskControllerCommands.INITIALIZE_TASK.toString()
+ " for " + context.task.getTaskID().toString());
}
- runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
+ runCommand(TaskControllerCommands.INITIALIZE_TASK,
+ context.env.conf.getUser(),
buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
}
@@ -249,7 +255,7 @@ class LinuxTaskController extends TaskCo
* cleanup. Last arg in this List is either $attemptId or $attemptId/work
*/
private List<String> buildTaskCleanupArgs(
- TaskControllerPathDeletionContext context) {
+ TaskControllerTaskPathDeletionContext context) {
List<String> commandArgs = new ArrayList<String>(3);
commandArgs.add(context.mapredLocalDir.toUri().getPath());
commandArgs.add(context.task.getJobID().toString());
@@ -269,39 +275,82 @@ class LinuxTaskController extends TaskCo
}
/**
+ * Builds the args to be passed to task-controller for enabling of job for
+ * cleanup. Last arg in this List is $jobid.
+ */
+ private List<String> buildJobCleanupArgs(
+ TaskControllerJobPathDeletionContext context) {
+ List<String> commandArgs = new ArrayList<String>(2);
+ commandArgs.add(context.mapredLocalDir.toUri().getPath());
+ commandArgs.add(context.jobId.toString());
+
+ return commandArgs;
+ }
+
+ /**
* Enables the task for cleanup by changing permissions of the specified path
* in the local filesystem
*/
@Override
void enableTaskForCleanup(PathDeletionContext context)
throws IOException {
+ if (context instanceof TaskControllerTaskPathDeletionContext) {
+ TaskControllerTaskPathDeletionContext tContext =
+ (TaskControllerTaskPathDeletionContext) context;
+ enablePathForCleanup(tContext,
+ TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
+ buildTaskCleanupArgs(tContext));
+ }
+ else {
+ throw new IllegalArgumentException("PathDeletionContext provided is not "
+ + "TaskControllerTaskPathDeletionContext.");
+ }
+ }
+
+ /**
+ * Enables the job for cleanup by changing permissions of the specified path
+ * in the local filesystem
+ */
+ @Override
+ void enableJobForCleanup(PathDeletionContext context)
+ throws IOException {
+ if (context instanceof TaskControllerJobPathDeletionContext) {
+ TaskControllerJobPathDeletionContext tContext =
+ (TaskControllerJobPathDeletionContext) context;
+ enablePathForCleanup(tContext,
+ TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
+ buildJobCleanupArgs(tContext));
+ } else {
+ throw new IllegalArgumentException("PathDeletionContext provided is not "
+ + "TaskControllerJobPathDeletionContext.");
+ }
+ }
+
+ /**
+ * Enable a path for cleanup
+ * @param c {@link TaskControllerPathDeletionContext} for the path to be
+ * cleaned up
+ * @param command {@link TaskControllerCommands} for task/job cleanup
+ * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable
+ * path cleanup
+ */
+ private void enablePathForCleanup(TaskControllerPathDeletionContext c,
+ TaskControllerCommands command,
+ List<String> cleanupArgs) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
- + " for " + context.fullPath);
+ LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
}
- if (context instanceof TaskControllerPathDeletionContext) {
- TaskControllerPathDeletionContext tContext =
- (TaskControllerPathDeletionContext) context;
-
- if (tContext.task.getUser() != null &&
- tContext.fs instanceof LocalFileSystem) {
- try {
- runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
- tContext.task.getUser(),
- buildTaskCleanupArgs(tContext), null, null);
- } catch(IOException e) {
- LOG.warn("Uanble to change permissions for " + tContext.fullPath);
- }
- }
- else {
- throw new IllegalArgumentException("Either user is null or the " +
- "file system is not local file system.");
+ if ( c.user != null && c.fs instanceof LocalFileSystem) {
+ try {
+ runCommand(command, c.user, cleanupArgs, null, null);
+ } catch(IOException e) {
+ LOG.warn("Unable to change permissions for " + c.fullPath);
}
}
else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerPathDeletionContext.");
+ throw new IllegalArgumentException("Either user is null or the "
+ + "file system is not local file system.");
}
}
@@ -323,7 +372,7 @@ class LinuxTaskController extends TaskCo
/**
* Returns list of arguments to be passed while launching task VM.
- * See {@code buildTaskControllerExecutor(TaskCommands,
+ * See {@code buildTaskControllerExecutor(TaskControllerCommands,
* String, List<String>, JvmEnv)} documentation.
* @param context
* @return Argument to be used while launching Task VM
@@ -379,8 +428,8 @@ class LinuxTaskController extends TaskCo
args.add("--");
args.add(context.localizedBaseDir.toString());
args.add(context.uniqueString);
- runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, context.user,
- args, context.workDir, null);
+ runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE,
+ context.user, args, context.workDir, null);
}
@Override
@@ -388,7 +437,7 @@ class LinuxTaskController extends TaskCo
throws IOException {
LOG.debug("Going to initialize user directories for " + context.user
+ " on the TT");
- runCommand(TaskCommands.INITIALIZE_USER, context.user,
+ runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
new ArrayList<String>(), context.workDir, null);
}
@@ -412,7 +461,7 @@ class LinuxTaskController extends TaskCo
* @throws IOException
*/
private ShellCommandExecutor buildTaskControllerExecutor(
- TaskCommands command, String userName, List<String> cmdArgs,
+ TaskControllerCommands command, String userName, List<String> cmdArgs,
File workDir, Map<String, String> env)
throws IOException {
String[] taskControllerCmd = new String[3 + cmdArgs.size()];
@@ -500,14 +549,14 @@ class LinuxTaskController extends TaskCo
throws IOException {
LOG.debug("Going to initialize job " + context.jobid.toString()
+ " on the TT");
- runCommand(TaskCommands.INITIALIZE_JOB, context.user,
+ runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
buildInitializeJobCommandArgs(context), context.workDir, null);
}
/**
* API which builds the command line to be pass to LinuxTaskController
* binary to terminate/kill the task. See
- * {@code buildTaskControllerExecutor(TaskCommands,
+ * {@code buildTaskControllerExecutor(TaskControllerCommands,
* String, List<String>, JvmEnv)} documentation.
*
*
@@ -529,7 +578,7 @@ class LinuxTaskController extends TaskCo
* @throws IOException
*/
private void finishTask(TaskControllerContext context,
- TaskCommands command) throws IOException{
+ TaskControllerCommands command) throws IOException{
if(context.task == null) {
LOG.info("Context task null not killing the JVM");
return;
@@ -549,7 +598,7 @@ class LinuxTaskController extends TaskCo
@Override
void terminateTask(TaskControllerContext context) {
try {
- finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
+ finishTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
} catch (Exception e) {
LOG.warn("Exception thrown while sending kill to the Task VM " +
StringUtils.stringifyException(e));
@@ -559,7 +608,7 @@ class LinuxTaskController extends TaskCo
@Override
void killTask(TaskControllerContext context) {
try {
- finishTask(context, TaskCommands.KILL_TASK_JVM);
+ finishTask(context, TaskControllerCommands.KILL_TASK_JVM);
} catch (Exception e) {
LOG.warn("Exception thrown while sending destroy to the Task VM " +
StringUtils.stringifyException(e));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 04:02:01 2011
@@ -191,25 +191,26 @@ public abstract class TaskController imp
* Contains info related to the path of the file/dir to be deleted. This info
* is needed by task-controller to build the full path of the file/dir
*/
- static class TaskControllerPathDeletionContext extends PathDeletionContext {
- Task task;
- boolean isWorkDir;
+ static abstract class TaskControllerPathDeletionContext
+ extends PathDeletionContext {
TaskController taskController;
+ String user;
/**
- * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
- * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
- * is built using mapredLocalDir, jobId, taskId, etc.
+ * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir,
+ * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir,
+ * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId,
+ * taskId, etc.
*/
Path mapredLocalDir;
public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
- Task task, boolean isWorkDir, TaskController taskController) {
+ TaskController taskController,
+ String user) {
super(fs, null);
- this.task = task;
- this.isWorkDir = isWorkDir;
this.taskController = taskController;
this.mapredLocalDir = mapredLocalDir;
+ this.user = user;
}
@Override
@@ -221,18 +222,56 @@ public abstract class TaskController imp
}
/**
+ * Return the component of the path under the {@link #mapredLocalDir} to be
+ * cleaned up. Its the responsibility of the class that extends
+ * {@link TaskControllerPathDeletionContext} to provide the correct
+ * component. For example
+ * - For task related cleanups, either the task-work-dir or task-local-dir
+ * might be returned depending on jvm reuse.
+ * - For job related cleanup, simply the job-local-dir might be returned.
+ */
+ abstract protected String getPath();
+
+ /**
* Builds the path of taskAttemptDir OR taskWorkDir based on
* mapredLocalDir, jobId, taskId, etc
*/
String buildPathForDeletion() {
+ return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
+ }
+ }
+
+ /** Contains info related to the path of the file/dir to be deleted. This info
+ * is needed by task-controller to build the full path of the task-work-dir or
+ * task-local-dir depending on whether the jvm is reused or not.
+ */
+ static class TaskControllerTaskPathDeletionContext
+ extends TaskControllerPathDeletionContext {
+ final Task task;
+ final boolean isWorkDir;
+
+ public TaskControllerTaskPathDeletionContext(FileSystem fs,
+ Path mapredLocalDir, Task task, boolean isWorkDir,
+ TaskController taskController) {
+ super(fs, mapredLocalDir, taskController, task.getUser());
+ this.task = task;
+ this.isWorkDir = isWorkDir;
+ }
+
+ /**
+ * Returns the taskWorkDir or taskLocalDir based on whether
+ * {@link TaskControllerTaskPathDeletionContext} is configured to delete
+ * the workDir.
+ */
+ @Override
+ protected String getPath() {
String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
task.getJobID().toString(), task.getTaskID().toString(),
task.isTaskCleanupTask())
: TaskTracker.getLocalTaskDir(task.getUser(),
task.getJobID().toString(), task.getTaskID().toString(),
task.isTaskCleanupTask());
-
- return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
+ return subDir;
}
/**
@@ -248,6 +287,41 @@ public abstract class TaskController imp
}
}
+ /** Contains info related to the path of the file/dir to be deleted. This info
+ * is needed by task-controller to build the full path of the job-local-dir.
+ */
+ static class TaskControllerJobPathDeletionContext
+ extends TaskControllerPathDeletionContext {
+ final JobID jobId;
+
+ public TaskControllerJobPathDeletionContext(FileSystem fs,
+ Path mapredLocalDir, JobID id, String user,
+ TaskController taskController) {
+ super(fs, mapredLocalDir, taskController, user);
+ this.jobId = id;
+ }
+
+ /**
+ * Returns the jobLocalDir of the job to be cleaned up.
+ */
+ @Override
+ protected String getPath() {
+ return TaskTracker.getLocalJobDir(user, jobId.toString());
+ }
+
+ /**
+ * Makes the path(and its sub-directories recursively) fully deletable by
+ * setting proper permissions(770) by task-controller
+ */
+ @Override
+ protected void enablePathForCleanup() throws IOException {
+ getPathForCleanup();// allow init of fullPath, if not inited already
+ if (fs.exists(new Path(fullPath))) {
+ taskController.enableJobForCleanup(this);
+ }
+ }
+ }
+
/**
* NOTE: This class is internal only class and not intended for users!!
*
@@ -322,4 +396,12 @@ public abstract class TaskController imp
*/
public abstract void initializeUser(InitializationContext context)
throws IOException;
+
+ /**
+ * Enable the job for cleanup by changing permissions of the path
+ * @param context path deletion context
+ * @throws IOException
+ */
+ abstract void enableJobForCleanup(PathDeletionContext context)
+ throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 04:02:01 2011
@@ -71,6 +71,8 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -1722,7 +1724,33 @@ public class TaskTracker
}
/**
- * Builds list of TaskControllerPathDeletionContext objects for a task
+ * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a
+ * job each pointing to the job's jobLocalDir.
+ * @param fs : FileSystem in which the dirs to be deleted
+ * @param paths : mapred-local-dirs
+ * @param id : {@link JobID} of the job for which the local-dir needs to
+ * be cleaned up.
+ * @param user : Job owner's username
+ * @param taskController : the task-controller to be used for deletion of
+ * jobLocalDir
+ */
+ static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
+ FileSystem fs, Path[] paths, JobID id, String user,
+ TaskController taskController)
+ throws IOException {
+ int i = 0;
+ PathDeletionContext[] contexts =
+ new TaskControllerPathDeletionContext[paths.length];
+
+ for (Path p : paths) {
+ contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
+ taskController);
+ }
+ return contexts;
+ }
+
+ /**
+ * Builds list of TaskControllerTaskPathDeletionContext objects for a task
* @param fs : FileSystem in which the dirs to be deleted
* @param paths : mapred-local-dirs
* @param task : the task whose taskDir or taskWorkDir is going to be deleted
@@ -1730,7 +1758,7 @@ public class TaskTracker
* @param taskController : the task-controller to be used for deletion of
* taskDir or taskWorkDir
*/
- static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
+ static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
TaskController taskController)
throws IOException {
@@ -1739,7 +1767,7 @@ public class TaskTracker
new TaskControllerPathDeletionContext[paths.length];
for (Path p : paths) {
- contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
+ contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
isWorkDir, taskController);
}
return contexts;
@@ -1773,7 +1801,7 @@ public class TaskTracker
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles) {
- removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString());
+ removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
}
// Remove this job
rjob.tasks.clear();
@@ -1792,10 +1820,11 @@ public class TaskTracker
* @param rjob
* @throws IOException
*/
- void removeJobFiles(String user, String jobId)
+ void removeJobFiles(String user, JobID jobId)
throws IOException {
- PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
- getLocalFiles(fConf, getLocalJobDir(user, jobId)));
+ PathDeletionContext[] contexts =
+ buildTaskControllerJobPathDeletionContexts(localFs,
+ getLocalFiles(fConf, ""), jobId, user, taskController);
directoryCleanupThread.addToQueue(contexts);
}
@@ -2946,7 +2975,7 @@ public class TaskTracker
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
// No jvm reuse, remove everything
PathDeletionContext[] contexts =
- buildTaskControllerPathDeletionContexts(localFs,
+ buildTaskControllerTaskPathDeletionContexts(localFs,
getLocalFiles(fConf, ""), task, false/* not workDir */,
taskController);
directoryCleanupThread.addToQueue(contexts);
@@ -2966,7 +2995,7 @@ public class TaskTracker
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
PathDeletionContext[] contexts =
- buildTaskControllerPathDeletionContexts(localFs,
+ buildTaskControllerTaskPathDeletionContexts(localFs,
getLocalFiles(fConf, ""), task, true /* workDir */,
taskController);
directoryCleanupThread.addToQueue(contexts);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077308&r1=1077307&r2=1077308&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 04:02:01 2011
@@ -634,6 +634,7 @@ public class TestTaskTrackerLocalization
taskId.toString(), task.isTaskCleanupTask());
Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
+ assertTrue("No paths found", paths.length > 0);
for (Path p : paths) {
if (tracker.getLocalFileSystem().exists(p)) {
createFileAndSetPermissions(localizedJobConf, p);
@@ -660,7 +661,7 @@ public class TestTaskTrackerLocalization
// now try to delete the work dir and verify that there are no stale paths
JvmManager.deleteWorkDir(tracker, task);
}
- tracker.removeJobFiles(task.getUser(), jobId.toString());
+ tracker.removeJobFiles(task.getUser(), jobId);
assertTrue("Some task files are not deleted!! Number of stale paths is "
+ cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
@@ -793,4 +794,90 @@ public class TestTaskTrackerLocalization
assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
tracker.getLocalFileSystem().exists(logDir));
}
+
+ /**
+ * Test job cleanup by doing the following
+ * - create files with no write permissions to TT under job-work-dir
+ * - create files with no write permissions to TT under task-work-dir
+ */
+ public void testJobCleanup() throws IOException, InterruptedException {
+ if (!canRun()) {
+ return;
+ }
+
+ LOG.info("Running testJobCleanup()");
+ // Localize job and localize task.
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+ localizedJobConf =
+ tracker.localizeJobFiles(task,
+ new TaskTracker.RunningJob(task.getJobID()));
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of job-work-dir
+ JobInitializationContext jobContext = new JobInitializationContext();
+ jobContext.jobid = jobId;
+ jobContext.user = localizedJobConf.getUser();
+ jobContext.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+ taskController.initializeJob(jobContext);
+
+ // Set an inline cleanup queue
+ InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+ tracker.setCleanupThread(cleanupQueue);
+
+ // Create a file in job's work-dir with 555
+ String jobWorkDir =
+ TaskTracker.getJobWorkDir(task.getUser(), task.getJobID().toString());
+ Path[] jPaths = tracker.getLocalFiles(localizedJobConf, jobWorkDir);
+ assertTrue("No paths found for job", jPaths.length > 0);
+ for (Path p : jPaths) {
+ if (tracker.getLocalFileSystem().exists(p)) {
+ createFileAndSetPermissions(localizedJobConf, p);
+ }
+ }
+
+ // Initialize task dirs
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ tip.setJobConf(localizedJobConf);
+ tip.localizeTask(task);
+
+ // Create a file in task local dir with 555
+ // this is to simply test the case where the jvm reuse is enabled and some
+ // files in task-attempt-local-dir are left behind to be cleaned up when the
+ // job finishes.
+ String taskLocalDir =
+ TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
+ task.getTaskID().toString(), false);
+ Path[] tPaths = tracker.getLocalFiles(localizedJobConf, taskLocalDir);
+ assertTrue("No paths found for task", tPaths.length > 0);
+ for (Path p : tPaths) {
+ if (tracker.getLocalFileSystem().exists(p)) {
+ createFileAndSetPermissions(localizedJobConf, p);
+ }
+ }
+
+ // remove the job work dir
+ tracker.removeJobFiles(task.getUser(), task.getJobID());
+
+ // check the task-local-dir
+ boolean tLocalDirExists = false;
+ for (Path p : tPaths) {
+ if (tracker.getLocalFileSystem().exists(p)) {
+ tLocalDirExists = true;
+ }
+ }
+ assertFalse("Task " + task.getTaskID() + " local dir exists after cleanup",
+ tLocalDirExists);
+
+ // Verify that the TaskTracker (via the task-controller) cleans up the dirs.
+ // check the job-work-dir
+ boolean jWorkDirExists = false;
+ for (Path p : jPaths) {
+ if (tracker.getLocalFileSystem().exists(p)) {
+ jWorkDirExists = true;
+ }
+ }
+ assertFalse("Job " + task.getJobID() + " work dir exists after cleanup",
+ jWorkDirExists);
+ }
}