You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:43:44 UTC
svn commit: r1077127 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
c++/task-controller/ mapred/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapreduce/server/tasktracker/
test/org/apache/hadoop/mapred/ test/org/apache/hadoop/secur...
Author: omalley
Date: Fri Mar 4 03:43:43 2011
New Revision: 1077127
URL: http://svn.apache.org/viewvc?rev=1077127&view=rev
Log:
commit b2c61f3f33b3c2ff8b42efd7473752b9b4bc125c
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Tue Jan 26 15:10:41 2010 +0530
MAPREDUCE:896 from https://issues.apache.org/jira/secure/attachment/12431413/MR-896.v8-y20.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-896. Enhance tasktracker to cleanup files that might have
+ been created by user tasks with non-writable permissions.
+ (Ravi Gummadi via yhemanth)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar 4 03:43:43 2011
@@ -58,6 +58,7 @@ int main(int argc, char **argv) {
NULL, 0 } };
const char* log_file = NULL;
+ char * dir_to_be_deleted = NULL;
//Minimum number of arguments required to run the task-controller
//command-name user command tt-root
@@ -135,6 +136,13 @@ int main(int argc, char **argv) {
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
break;
+ case ENABLE_TASK_FOR_CLEANUP:
+ tt_root = argv[optind++];
+ job_id = argv[optind++];
+ dir_to_be_deleted = argv[optind++];
+ exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id,
+ dir_to_be_deleted);
+ break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar 4 03:43:43 2011
@@ -197,6 +197,17 @@ char *get_task_launcher_file(const char
attempt_dir);
}
+/*
+ * Builds the full path of the dir(localTaskDir or localWorkDir)
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work)
+ */
+char *get_task_dir_path(const char *tt_root, const char *user,
+ const char *jobid, const char *dir_to_be_deleted) {
+ return concatenate(TT_LOCAL_TASK_DIR_PATTERN, "task_dir_full_path", 4,
+ tt_root, user, jobid, dir_to_be_deleted);
+}
+
/**
* Get the log directory for the given attempt.
*/
@@ -218,17 +229,17 @@ int check_tt_root(const char *tt_root) {
* launcher file resolve to one and same. This is done so as to avoid
* security pitfalls because of relative path components in the file name.
*/
-int check_task_launcher_path(char *path) {
+int check_path_for_relative_components(char *path) {
char * resolved_path = (char *) canonicalize_file_name(path);
if (resolved_path == NULL) {
fprintf(LOGFILE,
- "Error resolving the task launcher file path: %s. Passed path: %s\n",
+ "Error resolving the path: %s. Passed path: %s\n",
strerror(errno), path);
return ERROR_RESOLVING_FILE_PATH;
}
if (strcmp(resolved_path, path) != 0) {
fprintf(LOGFILE,
- "Relative path components in the file path: %s. Resolved path: %s\n",
+ "Relative path components in the path: %s. Resolved path: %s\n",
path, resolved_path);
free(resolved_path);
return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
@@ -255,20 +266,23 @@ static int change_owner(const char *path
static int change_mode(const char *path, mode_t mode) {
int exit_code = chmod(path, mode);
if (exit_code != 0) {
- fprintf(LOGFILE, "chown %d of path %s failed: %s.\n", mode, path,
+ fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path,
strerror(errno));
}
return exit_code;
}
/**
- * Function to secure the given path. It does the following recursively:
+ * Function to change permissions of the given path. It does the following
+ * recursively:
* 1) changes the owner/group of the paths to the passed owner/group
* 2) changes the file permission to the passed file_mode and directory
* permission to the passed dir_mode
+ *
+ * should_check_ownership : boolean to enable checking of ownership of each path
*/
static int secure_path(const char *path, uid_t uid, gid_t gid,
- mode_t file_mode, mode_t dir_mode) {
+ mode_t file_mode, mode_t dir_mode, int should_check_ownership) {
FTS *tree = NULL; // the file hierarchy
FTSENT *entry = NULL; // a file in the hierarchy
char *paths[] = { (char *) path, NULL };//array needs to be NULL-terminated
@@ -361,7 +375,8 @@ static int secure_path(const char *path,
if (!process_path) {
continue;
}
- if (compare_ownership(uid, gid, entry->fts_path) == 0) {
+ if (should_check_ownership &&
+ (compare_ownership(uid, gid, entry->fts_path) == 0)) {
// already set proper permissions.
// This might happen with distributed cache.
#ifdef DEBUG
@@ -373,7 +388,7 @@ static int secure_path(const char *path,
continue;
}
- if (check_ownership(entry->fts_path) != 0) {
+ if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
fprintf(LOGFILE,
"Invalid file path. %s not user/group owned by the tasktracker.\n",
entry->fts_path);
@@ -466,8 +481,9 @@ int prepare_attempt_directories(const ch
free(job_dir);
break;
}
- } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid,
- S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+ } else if (secure_path(attempt_dir, user_detail->pw_uid,
+ tasktracker_gid, S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG,
+ 1) != 0) {
// No setgid on files and setgid on dirs, 770
fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir);
failed = 1;
@@ -526,8 +542,8 @@ int prepare_task_logs(const char *log_di
}
gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
- if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU
- | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+ if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid,
+ S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
// setgid on dirs but not files, 770. As of now, there are no files though
fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
return -1;
@@ -640,9 +656,9 @@ int initialize_user(const char *user) {
free(user_dir);
break;
}
- } else if (secure_path(user_dir, user_detail->pw_uid, tasktracker_gid,
- S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
- != 0) {
+ } else if (secure_path(user_dir, user_detail->pw_uid,
+ tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR |
+ S_IXUSR | S_IRWXG, 1) != 0) {
// No setgid on files and setgid on dirs, 570
fprintf(LOGFILE, "Failed to secure the user_dir %s\n",
user_dir);
@@ -722,7 +738,7 @@ int initialize_job(const char *jobid, co
break;
}
} else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid,
- S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+ S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG, 1)
!= 0) {
// No setgid on files and setgid on dirs, 570
fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir);
@@ -848,7 +864,7 @@ int initialize_distributed_cache(const c
}
} else if (secure_path(distcache_dir, user_detail->pw_uid,
tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
- | S_IXUSR | S_IRWXG) != 0) {
+ | S_IXUSR | S_IRWXG, 1) != 0) {
// No setgid on files and setgid on dirs, 570
fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
distcache_dir);
@@ -963,7 +979,7 @@ int run_task_as_user(const char * user,
}
errno = 0;
- exit_code = check_task_launcher_path(task_script_path);
+ exit_code = check_path_for_relative_components(task_script_path);
if(exit_code != 0) {
goto cleanup;
}
@@ -1048,3 +1064,60 @@ int kill_user_task(const char *user, con
cleanup();
return 0;
}
+
+/**
+ * Enables the path for deletion by changing the owner, group and permissions
+ * of the specified path and all the files/directories in the path recursively.
+ * * sudo chown user:mapred -R full_path
+ * * sudo chmod 2770 -R full_path
+ * Before changing permissions, makes sure that the given path doesn't contain
+ * any relative components.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ */
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+ const char *jobid, const char *dir_to_be_deleted) {
+ int exit_code = 0;
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+ char * full_path = NULL;
+ if (check_tt_root(tt_root) < 0) {
+ fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
+ cleanup();
+ return INVALID_TT_ROOT;
+ }
+
+ full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted);
+ if (full_path == NULL) {
+ fprintf(LOGFILE,
+ "Could not build the full path. Not deleting the dir %s\n",
+ dir_to_be_deleted);
+ exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+ }
+ // Make sure that the path given is not having any relative components
+ else if ((exit_code = check_path_for_relative_components(full_path)) != 0) {
+ fprintf(LOGFILE,
+ "Not changing permissions. Path may contain relative components.\n",
+ full_path);
+ }
+ else if (get_user_details(user) < 0) {
+ fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+ exit_code = INVALID_USER_NAME;
+ }
+ else if (exit_code = secure_path(full_path, user_detail->pw_uid,
+ tasktracker_gid,
+ S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 0) != 0) {
+ // No setgid on files and setgid on dirs, 770.
+ // set 770 permissions for user, TTgroup for all files/directories in
+ // 'full_path' recursively sothat deletion of path by TaskTracker succeeds.
+
+ fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path);
+ }
+
+ if (full_path != NULL) {
+ free(full_path);
+ }
+ // free configurations
+ cleanup();
+ return exit_code;
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar 4 03:43:43 2011
@@ -44,6 +44,7 @@ enum command {
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
+ ENABLE_TASK_FOR_CLEANUP
};
enum errorcodes {
@@ -67,6 +68,7 @@ enum errorcodes {
OUT_OF_MEMORY, //18
INITIALIZE_DISTCACHE_FAILED, //19
INITIALIZE_USER_FAILED, //20
+ UNABLE_TO_BUILD_PATH //21
};
#define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -83,6 +85,8 @@ enum errorcodes {
#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
+#define TT_LOCAL_TASK_DIR_PATTERN "%s/taskTracker/%s/jobcache/%s/%s"
+
#define TT_SYS_DIR_KEY "mapred.local.dir"
#define TT_LOG_DIR_KEY "hadoop.log.dir"
@@ -109,6 +113,9 @@ int initialize_distributed_cache(const c
int kill_user_task(const char *user, const char *task_pid, int sig);
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+ const char *jobid, const char *dir_to_be_deleted);
+
int prepare_attempt_directory(const char *attempt_dir, const char *user);
// The following functions are exposed for testing
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Fri Mar 4 03:43:43 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
+import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
@@ -38,7 +39,7 @@ class CleanupQueue {
* paths(directories/files) in a separate thread. This constructor creates a
* clean-up thread and also starts it as a daemon. Callers can instantiate one
* CleanupQueue per JVM and can use it for deleting paths. Use
- * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for
+ * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
* deletion.
*/
public CleanupQueue() {
@@ -49,22 +50,61 @@ class CleanupQueue {
}
}
- public void addToQueue(FileSystem fs, Path...paths) {
- cleanupThread.addToQueue(fs, paths);
+ /**
+ * Contains info related to the path of the file/dir to be deleted
+ */
+ static class PathDeletionContext {
+ String fullPath;// full path of file or dir
+ FileSystem fs;
+
+ public PathDeletionContext(FileSystem fs, String fullPath) {
+ this.fs = fs;
+ this.fullPath = fullPath;
+ }
+
+ protected String getPathForCleanup() {
+ return fullPath;
+ }
+
+ /**
+ * Makes the path(and its subdirectories recursively) fully deletable
+ */
+ protected void enablePathForCleanup() throws IOException {
+ // Do nothing by default.
+ // Subclasses can override to provide enabling for deletion.
+ }
}
- private static class PathCleanupThread extends Thread {
+ /**
+ * Adds the paths to the queue of paths to be deleted by cleanupThread.
+ */
+ void addToQueue(PathDeletionContext... contexts) {
+ cleanupThread.addToQueue(contexts);
+ }
- static class PathAndFS {
- FileSystem fs;
- Path path;
- PathAndFS(FileSystem fs, Path path) {
- this.fs = fs;
- this.path = path;
- }
+ protected static boolean deletePath(PathDeletionContext context)
+ throws IOException {
+ context.enablePathForCleanup();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to delete " + context.fullPath);
+ }
+ if (context.fs.exists(new Path(context.fullPath))) {
+ return context.fs.delete(new Path(context.fullPath), true);
}
+ return true;
+ }
+
+ // currently used by tests only
+ protected boolean isQueueEmpty() {
+ return (cleanupThread.queue.size() == 0);
+ }
+
+ private static class PathCleanupThread extends Thread {
+
// cleanup queue which deletes files/directories of the paths queued up.
- private LinkedBlockingQueue<PathAndFS> queue = new LinkedBlockingQueue<PathAndFS>();
+ private LinkedBlockingQueue<PathDeletionContext> queue =
+ new LinkedBlockingQueue<PathDeletionContext>();
public PathCleanupThread() {
setName("Directory/File cleanup thread");
@@ -72,27 +112,34 @@ class CleanupQueue {
start();
}
- public void addToQueue(FileSystem fs, Path... paths) {
- for (Path p : paths) {
+ void addToQueue(PathDeletionContext[] contexts) {
+ for (PathDeletionContext context : contexts) {
try {
- queue.put(new PathAndFS(fs, p));
- } catch (InterruptedException ie) {}
+ queue.put(context);
+ } catch(InterruptedException ie) {}
}
}
public void run() {
- LOG.debug(getName() + " started.");
- PathAndFS pathAndFS = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + " started.");
+ }
+ PathDeletionContext context = null;
while (true) {
try {
- pathAndFS = queue.take();
+ context = queue.take();
// delete the path.
- pathAndFS.fs.delete(pathAndFS.path, true);
- LOG.debug("DELETED " + pathAndFS.path);
+ if (!deletePath(context)) {
+ LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
+ }
+ else if (LOG.isDebugEnabled()) {
+ LOG.debug("DELETED " + context.fullPath);
+ }
} catch (InterruptedException t) {
+ LOG.warn("Interrupted deletion of " + context.fullPath);
return;
} catch (Exception e) {
- LOG.warn("Error deleting path" + pathAndFS.path);
+ LOG.warn("Error deleting path " + context.fullPath + ": " + e);
}
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar 4 03:43:43 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
@@ -133,6 +135,23 @@ public class DefaultTaskController exten
}
}
+ /**
+ * Enables the task for cleanup by changing permissions of the specified path
+ * in the local filesystem
+ */
+ @Override
+ void enableTaskForCleanup(PathDeletionContext context)
+ throws IOException {
+ try {
+ FileUtil.chmod(context.fullPath, "ug+rwx", true);
+ } catch(InterruptedException e) {
+ LOG.warn("Interrupted while setting permissions for " + context.fullPath +
+ " for deletion.");
+ } catch(IOException ioe) {
+ LOG.warn("Unable to change permissions of " + context.fullPath);
+ }
+ }
+
@Override
public void initializeDistributedCache(InitializationContext context) {
// Do nothing.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:43:43 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
@@ -2850,7 +2851,8 @@ class JobInProgress {
}
Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
- new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir);
+ new CleanupQueue().addToQueue(new PathDeletionContext(
+ jobtracker.getFileSystem(tempDir), tempDir.toUri().getPath()));
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar 4 03:43:43 2011
@@ -30,7 +30,6 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.ProcessTree;
@@ -144,6 +143,22 @@ class JvmManager {
}
}
+ /**
+ * Adds the task's work dir to the cleanup queue of taskTracker for
+ * asynchronous deletion of work dir.
+ * @param tracker taskTracker
+ * @param task the task whose work dir needs to be deleted
+ * @throws IOException
+ */
+ static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
+ tracker.getCleanupThread().addToQueue(
+ TaskTracker.buildTaskControllerPathDeletionContexts(
+ tracker.getLocalFileSystem(),
+ tracker.getLocalFiles(tracker.getJobConf(), ""),
+ task, true /* workDir */,
+ tracker.getTaskController()));
+ }
+
private static class JvmManagerForType {
//Mapping from the JVM IDs to running Tasks
Map <JVMId,TaskRunner> jvmToRunningTask =
@@ -438,7 +453,7 @@ class JvmManager {
//task at the beginning of each task in the task JVM.
//For the last task, we do it here.
if (env.conf.getNumTasksToExecutePerJvm() != 1) {
- FileUtil.fullyDelete(env.workDir);
+ deleteWorkDir(tracker, initalContext.task);
}
} catch (IOException ie){}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 03:43:43 2011
@@ -29,6 +29,9 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -87,6 +90,7 @@ class LinuxTaskController extends TaskCo
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
+ ENABLE_TASK_FOR_CLEANUP
}
/**
@@ -208,12 +212,75 @@ class LinuxTaskController extends TaskCo
@Override
void initializeTask(TaskControllerContext context)
throws IOException {
- LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
- + " for " + context.task.getTaskID().toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+ + " for " + context.task.getTaskID().toString());
+ }
runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
}
+ /**
+ * Builds the args to be passed to task-controller for enabling of task for
+ * cleanup. Last arg in this List is either $attemptId or $attemptId/work
+ */
+ private List<String> buildTaskCleanupArgs(
+ TaskControllerPathDeletionContext context) {
+ List<String> commandArgs = new ArrayList<String>(3);
+ commandArgs.add(context.mapredLocalDir.toUri().getPath());
+ commandArgs.add(context.task.getJobID().toString());
+
+ String workDir = "";
+ if (context.isWorkDir) {
+ workDir = "/work";
+ }
+ if (context.task.isTaskCleanupTask()) {
+ commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+ + workDir);
+ } else {
+ commandArgs.add(context.task.getTaskID() + workDir);
+ }
+
+ return commandArgs;
+ }
+
+ /**
+ * Enables the task for cleanup by changing permissions of the specified path
+ * in the local filesystem
+ */
+ @Override
+ void enableTaskForCleanup(PathDeletionContext context)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
+ + " for " + context.fullPath);
+ }
+
+ if (context instanceof TaskControllerPathDeletionContext) {
+ TaskControllerPathDeletionContext tContext =
+ (TaskControllerPathDeletionContext) context;
+
+ if (tContext.task.getUser() != null &&
+ tContext.fs instanceof LocalFileSystem) {
+ try {
+ runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
+ tContext.task.getUser(),
+ buildTaskCleanupArgs(tContext), null, null);
+ } catch(IOException e) {
+ LOG.warn("Uanble to change permissions for " + tContext.fullPath);
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Either user is null or the " +
+ "file system is not local file system.");
+ }
+ }
+ else {
+ throw new IllegalArgumentException("PathDeletionContext provided is not "
+ + "TaskControllerPathDeletionContext.");
+ }
+ }
+
private void logOutput(String output) {
String shExecOutput = output;
if (shExecOutput != null) {
@@ -436,7 +503,8 @@ class LinuxTaskController extends TaskCo
}
ShellCommandExecutor shExec = buildTaskControllerExecutor(
command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
+ buildKillTaskCommandArgs(context), context.env.workDir,
+ context.env.env);
try {
shExec.execute();
} catch (Exception e) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 03:43:43 2011
@@ -24,6 +24,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.util.StringUtils;
@@ -187,6 +190,67 @@ public abstract class TaskController imp
}
/**
+ * Contains info related to the path of the file/dir to be deleted. This info
+ * is needed by task-controller to build the full path of the file/dir
+ */
+ static class TaskControllerPathDeletionContext extends PathDeletionContext {
+ Task task;
+ boolean isWorkDir;
+ TaskController taskController;
+
+ /**
+ * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
+ * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
+ * is built using mapredLocalDir, jobId, taskId, etc.
+ */
+ Path mapredLocalDir;
+
+ public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
+ Task task, boolean isWorkDir, TaskController taskController) {
+ super(fs, null);
+ this.task = task;
+ this.isWorkDir = isWorkDir;
+ this.taskController = taskController;
+ this.mapredLocalDir = mapredLocalDir;
+ }
+
+ @Override
+ protected String getPathForCleanup() {
+ if (fullPath == null) {
+ fullPath = buildPathForDeletion();
+ }
+ return fullPath;
+ }
+
+ /**
+ * Builds the path of taskAttemptDir OR taskWorkDir based on
+ * mapredLocalDir, jobId, taskId, etc
+ */
+ String buildPathForDeletion() {
+ String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
+ task.getJobID().toString(), task.getTaskID().toString(),
+ task.isTaskCleanupTask())
+ : TaskTracker.getLocalTaskDir(task.getUser(),
+ task.getJobID().toString(), task.getTaskID().toString(),
+ task.isTaskCleanupTask());
+
+ return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
+ }
+
+ /**
+ * Makes the path(and its subdirectories recursively) fully deletable by
+ * setting proper permissions(770) by task-controller
+ */
+ @Override
+ protected void enablePathForCleanup() throws IOException {
+ getPathForCleanup();// allow init of fullPath, if not inited already
+ if (fs.exists(new Path(fullPath))) {
+ taskController.enableTaskForCleanup(this);
+ }
+ }
+ }
+
+ /**
* NOTE: This class is internal only class and not intended for users!!
*
*/
@@ -207,6 +271,13 @@ public abstract class TaskController imp
abstract void terminateTask(TaskControllerContext context);
/**
+ * Enable the task for cleanup by changing permissions of the path
+ * @param context path deletion context
+ * @throws IOException
+ */
+ abstract void enableTaskForCleanup(PathDeletionContext context)
+ throws IOException;
+ /**
* Sends a KILL signal to forcefully terminate the taskJVM and its
* sub-processes.
*
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:43:43 2011
@@ -638,6 +638,39 @@ abstract class TaskRunner extends Thread
}
/**
+ * Sets permissions recursively and then deletes the contents of dir.
+ * Makes dir empty directory(does not delete dir itself).
+ */
+ static void deleteDirContents(JobConf conf, File dir) throws IOException {
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (fs.exists(new Path(dir.getAbsolutePath()))) {
+ File contents[] = dir.listFiles();
+ if (contents != null) {
+ for (int i = 0; i < contents.length; i++) {
+ try {
+ int ret = 0;
+ if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
+ "ug+rwx", true)) != 0) {
+ LOG.warn("Unable to chmod for " + contents[i] +
+ "; chmod exit status = " + ret);
+ }
+ } catch(InterruptedException e) {
+ LOG.warn("Interrupted while setting permissions for contents of " +
+ "workDir. Not deleting the remaining contents of workDir.");
+ return;
+ }
+ if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
+ LOG.warn("Unable to delete "+ contents[i]);
+ }
+ }
+ }
+ }
+ else {
+ LOG.warn(dir + " does not exist.");
+ }
+ }
+
+ /**
* Creates distributed cache symlinks and tmp directory, as appropriate.
* Note that when we setup the distributed
* cache, we didn't create the symlinks. This is done on a per task basis
@@ -647,11 +680,14 @@ abstract class TaskRunner extends Thread
* @param workDir Working directory, which is completely deleted.
*/
public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
- LOG.debug("Fully deleting and re-creating" + workDir);
- FileUtil.fullyDelete(workDir);
- if (!workDir.mkdir()) {
- LOG.debug("Did not recreate " + workDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fully deleting contents of " + workDir);
}
+
+ /** delete only the contents of workDir leaving the directory empty. We
+ * can't delete the workDir as it is the current working directory.
+ */
+ deleteDirContents(conf, workDir);
if (DistributedCache.getSymlink(conf)) {
URI[] archives = DistributedCache.getCacheArchives(conf);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:43:43 2011
@@ -69,6 +69,8 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -186,7 +188,7 @@ public class TaskTracker
// The filesystem where job files are stored
FileSystem systemFS = null;
- FileSystem localFs = null;
+ private FileSystem localFs = null;
private final HttpServer server;
volatile boolean shuttingDown = false;
@@ -401,6 +403,11 @@ public class TaskTracker
return taskController;
}
+ // Currently this is used only by tests
+ void setTaskController(TaskController t) {
+ taskController = t;
+ }
+
private RunningJob addTaskToJob(JobID jobId,
TaskInProgress tip) {
synchronized (runningJobs) {
@@ -518,10 +525,7 @@ public class TaskTracker
static String getTaskWorkDir(String user, String jobid, String taskid,
boolean isCleanupAttempt) {
- String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
- if (isCleanupAttempt) {
- dir = dir + TASK_CLEANUP_SUFFIX;
- }
+ String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
return dir + Path.SEPARATOR + MRConstants.WORKDIR;
}
@@ -1204,7 +1208,16 @@ public class TaskTracker
taskCleanupThread.start();
directoryCleanupThread = new CleanupQueue();
}
+
+ // only used by tests
+ void setCleanupThread(CleanupQueue c) {
+ directoryCleanupThread = c;
+ }
+ CleanupQueue getCleanupThread() {
+ return directoryCleanupThread;
+ }
+
/**
* The connection to the JobTracker, used by the TaskRunner
* for locating remote files.
@@ -1620,6 +1633,44 @@ public class TaskTracker
}
/**
+ * Builds list of PathDeletionContext objects for the given paths
+ */
+ private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
+ Path[] paths) {
+ int i = 0;
+ PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
+
+ for (Path p : paths) {
+ contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
+ }
+ return contexts;
+ }
+
+ /**
+ * Builds list of TaskControllerPathDeletionContext objects for a task
+ * @param fs : FileSystem in which the dirs to be deleted
+ * @param paths : mapred-local-dirs
+ * @param task : the task whose taskDir or taskWorkDir is going to be deleted
+ * @param isWorkDir : the dir to be deleted is workDir or taskDir
+ * @param taskController : the task-controller to be used for deletion of
+ * taskDir or taskWorkDir
+ */
+ static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
+ FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
+ TaskController taskController)
+ throws IOException {
+ int i = 0;
+ PathDeletionContext[] contexts =
+ new TaskControllerPathDeletionContext[paths.length];
+
+ for (Path p : paths) {
+ contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
+ isWorkDir, taskController);
+ }
+ return contexts;
+ }
+
+ /**
* The task tracker is done with this job, so we need to clean up.
* @param action The action with the job
* @throws IOException
@@ -1668,8 +1719,9 @@ public class TaskTracker
*/
void removeJobFiles(String user, String jobId)
throws IOException {
- directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
- getLocalJobDir(user, jobId)));
+ PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
+ getLocalFiles(fConf, getLocalJobDir(user, jobId)));
+ directoryCleanupThread.addToQueue(contexts);
}
/**
@@ -2766,29 +2818,33 @@ public class TaskTracker
runner.close();
}
- String localTaskDir =
- getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
- .toString(), task.isTaskCleanupTask());
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
// No jvm reuse, remove everything
- directoryCleanupThread.addToQueue(localFs, getLocalFiles(
- defaultJobConf, localTaskDir));
+ PathDeletionContext[] contexts =
+ buildTaskControllerPathDeletionContexts(localFs,
+ getLocalFiles(fConf, ""), task, false/* not workDir */,
+ taskController);
+ directoryCleanupThread.addToQueue(contexts);
} else {
// Jvm reuse. We don't delete the workdir since some other task
// (running in the same JVM) might be using the dir. The JVM
// running the tasks would clean the workdir per a task in the
// task process itself.
- directoryCleanupThread.addToQueue(localFs, getLocalFiles(
- defaultJobConf, localTaskDir + Path.SEPARATOR
- + TaskTracker.JOBFILE));
+ String localTaskDir =
+ getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
+ .toString(), task.isTaskCleanupTask());
+ PathDeletionContext[] contexts = buildPathDeletionContexts(
+ localFs, getLocalFiles(defaultJobConf, localTaskDir +
+ Path.SEPARATOR + TaskTracker.JOBFILE));
+ directoryCleanupThread.addToQueue(contexts);
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
- String taskWorkDir =
- getTaskWorkDir(task.getUser(), task.getJobID().toString(),
- taskId.toString(), task.isTaskCleanupTask());
- directoryCleanupThread.addToQueue(localFs, getLocalFiles(
- defaultJobConf, taskWorkDir));
+ PathDeletionContext[] contexts =
+ buildTaskControllerPathDeletionContexts(localFs,
+ getLocalFiles(fConf, ""), task, true /* workDir */,
+ taskController);
+ directoryCleanupThread.addToQueue(contexts);
}
}
}
@@ -3380,17 +3436,28 @@ public class TaskTracker
// get the full paths of the directory in all the local disks.
- private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+ Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
String[] localDirs = conf.getLocalDirs();
Path[] paths = new Path[localDirs.length];
FileSystem localFs = FileSystem.getLocal(conf);
+ boolean subdirNeeded = (subdir != null) && (subdir.length() > 0);
for (int i = 0; i < localDirs.length; i++) {
- paths[i] = new Path(localDirs[i], subdir);
+ paths[i] = (subdirNeeded) ? new Path(localDirs[i], subdir)
+ : new Path(localDirs[i]);
paths[i] = paths[i].makeQualified(localFs);
}
return paths;
}
+ FileSystem getLocalFileSystem(){
+ return localFs;
+ }
+
+ // only used by tests
+ void setLocalFileSystem(FileSystem fs){
+ localFs = fs;
+ }
+
int getMaxCurrentMapTasks() {
return maxMapSlots;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar 4 03:43:43 2011
@@ -193,7 +193,7 @@ public class Localizer {
synchronized (localizedUser) {
if (localizedUser.get()) {
- // User-directories are already localized for his user.
+ // User-directories are already localized for this user.
LOG.info("User-directories for the user " + user
+ " are already initialized on this TT. Not doing anything.");
return;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar 4 03:43:43 2011
@@ -257,7 +257,18 @@ public class MiniMRCluster {
public int getNumTaskTrackers() {
return taskTrackerList.size();
}
-
+
+ /**
+ * Sets inline cleanup threads to all task trackers sothat deletion of
+ * temporary files/dirs happen inline
+ */
+ public void setInlineCleanupThreads() {
+ for (int i = 0; i < getNumTaskTrackers(); i++) {
+ getTaskTrackerRunner(i).getTaskTracker().setCleanupThread(
+ new UtilsForTests.InlineCleanupQueue());
+ }
+ }
+
/**
* Wait until the system is idle.
*/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java Fri Mar 4 03:43:43 2011
@@ -58,6 +58,9 @@ public class TestJobDirCleanup extends T
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(10, namenode, 3,
null, null, mrConf);
+ // make cleanup inline sothat validation of existence of these directories
+ // can be done
+ mr.setInlineCleanupThreads();
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = mr.createJobConf();
runSleepJob(jobConf);
@@ -66,13 +69,8 @@ public class TestJobDirCleanup extends T
"/taskTracker/jobcache";
File jobDir = new File(jobDirStr);
String[] contents = jobDir.list();
- while (contents.length > 0) {
- try {
- Thread.sleep(1000);
- LOG.warn(jobDir +" not empty yet");
- contents = jobDir.list();
- } catch (InterruptedException ie){}
- }
+ assertTrue("Contents of " + jobDir + " not cleanup.",
+ (contents == null || contents.length == 0));
}
} catch (Exception ee){
} finally {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 03:43:43 2011
@@ -44,10 +44,15 @@ public class TestLocalizationWithLinuxTa
private static String taskTrackerSpecialGroup;
@Override
+ protected boolean canRun() {
+ return ClusterWithLinuxTaskController.shouldRun();
+ }
+
+ @Override
protected void setUp()
throws Exception {
- if (!ClusterWithLinuxTaskController.shouldRun()) {
+ if (!canRun()) {
return;
}
@@ -65,7 +70,8 @@ public class TestLocalizationWithLinuxTa
taskController.setConf(trackerFConf);
taskController.setup();
- tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+ tracker.setTaskController(taskController);
+ tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
taskController));
// Rewrite conf so as to reflect task's correct user name.
@@ -83,7 +89,7 @@ public class TestLocalizationWithLinuxTa
@Override
protected void tearDown()
throws Exception {
- if (!ClusterWithLinuxTaskController.shouldRun()) {
+ if (!canRun()) {
return;
}
super.tearDown();
@@ -98,21 +104,6 @@ public class TestLocalizationWithLinuxTa
// Do nothing.
}
- /**
- * Test the localization of a user on the TT when {@link LinuxTaskController}
- * is in use.
- */
- @Override
- public void testUserLocalization()
- throws IOException {
-
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
-
- super.testJobLocalization();
- }
-
@Override
protected void checkUserLocalization()
throws IOException {
@@ -150,21 +141,6 @@ public class TestLocalizationWithLinuxTa
}
}
- /**
- * Test job localization with {@link LinuxTaskController}. Also check the
- * permissions and file ownership of the job related files.
- */
- @Override
- public void testJobLocalization()
- throws IOException {
-
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
-
- super.testJobLocalization();
- }
-
@Override
protected void checkJobLocalization()
throws IOException {
@@ -210,21 +186,6 @@ public class TestLocalizationWithLinuxTa
}
}
- /**
- * Test task localization with {@link LinuxTaskController}. Also check the
- * permissions and file ownership of task related files.
- */
- @Override
- public void testTaskLocalization()
- throws IOException {
-
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
-
- super.testTaskLocalization();
- }
-
@Override
protected void checkTaskLocalization()
throws IOException {
@@ -250,16 +211,4 @@ public class TestLocalizationWithLinuxTa
.getUser(), taskTrackerSpecialGroup);
}
}
-
- /**
- * Test cleanup of task files with {@link LinuxTaskController}.
- */
- @Override
- public void testTaskCleanup()
- throws IOException {
- if (!ClusterWithLinuxTaskController.shouldRun()) {
- return;
- }
- super.testTaskCleanup();
- }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Mar 4 03:43:43 2011
@@ -60,6 +60,10 @@ public class TestMiniMRLocalFS extends T
MiniMRCluster mr = null;
try {
mr = new MiniMRCluster(2, "file:///", 3);
+ // make cleanup inline sothat validation of existence of these directories
+ // can be done
+ mr.setInlineCleanupThreads();
+
TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
// run the wordcount example with caching
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar 4 03:43:43 2011
@@ -322,6 +322,9 @@ public class TestMiniMRWithDFS extends T
dfs = new MiniDFSCluster(conf, 4, true, null);
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+ // make cleanup inline sothat validation of existence of these directories
+ // can be done
+ mr.setInlineCleanupThreads();
runPI(mr, mr.createJobConf());
runWordCount(mr, mr.createJobConf());
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=1077127&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java Fri Mar 4 03:43:43 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+public class TestSetupWorkDir extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestSetupWorkDir.class);
+
+ /**
+ * Create a file in the given dir and set permissions r_xr_xr_x sothat no one
+ * can delete it directly(without doing chmod).
+ * Creates dir/subDir and dir/subDir/file
+ */
+ static void createFileAndSetPermissions(JobConf jobConf, Path dir)
+ throws IOException {
+ Path subDir = new Path(dir, "subDir");
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ fs.mkdirs(subDir);
+ Path p = new Path(subDir, "file");
+ DataOutputStream out = fs.create(p);
+ out.writeBytes("dummy input");
+ out.close();
+ // no write permission for subDir and subDir/file
+ try {
+ int ret = 0;
+ if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
+ LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
+ }
+ } catch(InterruptedException e) {
+ LOG.warn("Interrupted while doing chmod for " + subDir);
+ }
+ }
+
+ /**
+ * Validates if setupWorkDir is properly cleaning up contents of workDir.
+ */
+ public void testSetupWorkDir() throws IOException {
+ Path rootDir = new Path(System.getProperty("test.build.data", "/tmp"),
+ "testSetupWorkDir");
+ Path myWorkDir = new Path(rootDir, "./work");
+ JobConf jConf = new JobConf();
+ FileSystem fs = FileSystem.getLocal(jConf);
+ if (fs.exists(myWorkDir)) {
+ fs.delete(myWorkDir, true);
+ }
+ if (!fs.mkdirs(myWorkDir)) {
+ throw new IOException("Unable to create workDir " + myWorkDir);
+ }
+
+ // create {myWorkDir}/subDir/file and set 555 perms for subDir and file
+ createFileAndSetPermissions(jConf, myWorkDir);
+
+ TaskRunner.deleteDirContents(jConf, new File(myWorkDir.toUri().getPath()));
+
+ assertTrue("Contents of " + myWorkDir + " are not cleaned up properly.",
+ fs.listStatus(myWorkDir).length == 0);
+
+ // cleanup
+ fs.delete(rootDir, true);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:43:43 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.JvmManag
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
import junit.framework.TestCase;
@@ -73,36 +74,20 @@ public class TestTaskTrackerLocalization
protected File[] attemptLogFiles;
protected JobConf localizedTaskConf;
- class InlineCleanupQueue extends CleanupQueue {
- List<Path> stalePaths = new ArrayList<Path>();
-
- public InlineCleanupQueue() {
- // do nothing
- }
-
- @Override
- public void addToQueue(FileSystem fs, Path... paths) {
- // delete in-line
- for (Path p : paths) {
- try {
- LOG.info("Trying to delete the path " + p);
- if (!fs.delete(p, true)) {
- LOG.warn("Stale path " + p.toUri().getPath());
- stalePaths.add(p);
- }
- } catch (IOException e) {
- LOG.warn("Caught exception while deleting path "
- + p.toUri().getPath());
- LOG.info(StringUtils.stringifyException(e));
- stalePaths.add(p);
- }
- }
- }
+ /**
+ * Dummy method in this base class. Only derived classes will define this
+ * method for checking if a test can be run.
+ */
+ protected boolean canRun() {
+ return true;
}
@Override
protected void setUp()
throws Exception {
+ if (!canRun()) {
+ return;
+ }
TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp"), getClass()
.getSimpleName());
@@ -143,8 +128,9 @@ public class TestTaskTrackerLocalization
tracker.setConf(trackerFConf);
// for test case system FS is the local FS
- tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
-
+ tracker.systemFS = FileSystem.getLocal(trackerFConf);
+ tracker.setLocalFileSystem(tracker.systemFS);
+
taskTrackerUGI = UserGroupInformation.login(trackerFConf);
// Set up the task to be localized
@@ -159,8 +145,10 @@ public class TestTaskTrackerLocalization
taskController = new DefaultTaskController();
taskController.setConf(trackerFConf);
taskController.setup();
- tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
- taskController));
+
+ tracker.setTaskController(taskController);
+ tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
+ taskController));
}
/**
@@ -204,6 +192,9 @@ public class TestTaskTrackerLocalization
@Override
protected void tearDown()
throws Exception {
+ if (!canRun()) {
+ return;
+ }
FileUtil.fullyDelete(TEST_ROOT_DIR);
}
@@ -235,6 +226,9 @@ public class TestTaskTrackerLocalization
*/
public void testTaskControllerSetup()
throws IOException {
+ if (!canRun()) {
+ return;
+ }
// Task-controller is already set up in the test's setup method. Now verify.
for (String localDir : localDirs) {
@@ -258,7 +252,9 @@ public class TestTaskTrackerLocalization
*/
public void testUserLocalization()
throws IOException {
-
+ if (!canRun()) {
+ return;
+ }
// /////////// The main method being tested
tracker.getLocalizer().initializeUserDirs(task.getUser());
// ///////////
@@ -329,7 +325,9 @@ public class TestTaskTrackerLocalization
*/
public void testJobLocalization()
throws IOException {
-
+ if (!canRun()) {
+ return;
+ }
tracker.getLocalizer().initializeUserDirs(task.getUser());
// /////////// The main method being tested
@@ -423,7 +421,9 @@ public class TestTaskTrackerLocalization
*/
public void testTaskLocalization()
throws IOException {
-
+ if (!canRun()) {
+ return;
+ }
tracker.getLocalizer().initializeUserDirs(task.getUser());
localizedJobConf = tracker.localizeJobFiles(task);
@@ -539,14 +539,102 @@ public class TestTaskTrackerLocalization
}
/**
+ * Validates the removal of $taskid and $tasid/work under mapred-local-dir
+ * in cases where those directories cannot be deleted without adding
+ * write permission to the newly created directories under $taskid and
+ * $taskid/work
+ * Also see TestSetupWorkDir.createFileAndSetPermissions for details
+ */
+ void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
+ TaskInProgress tip) throws IOException {
+ // create files and set permissions 555. Verify if task controller sets
+ // the permissions for TT to delete the taskDir or workDir
+ String dir = (!needCleanup || jvmReuse) ?
+ TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(),
+ taskId.toString(), task.isTaskCleanupTask())
+ : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
+ taskId.toString(), task.isTaskCleanupTask());
+
+ Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
+ for (Path p : paths) {
+ if (tracker.getLocalFileSystem().exists(p)) {
+ TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p);
+ }
+ }
+
+ InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+ tracker.setCleanupThread(cleanupQueue);
+
+ tip.removeTaskFiles(needCleanup, taskId);
+
+ if (jvmReuse) {
+ // work dir should still exist and cleanup queue should be empty
+ assertTrue("cleanup queue is not empty after removeTaskFiles() in case "
+ + "of jvm reuse.", cleanupQueue.isQueueEmpty());
+ boolean workDirExists = false;
+ for (Path p : paths) {
+ if (tracker.getLocalFileSystem().exists(p)) {
+ workDirExists = true;
+ }
+ }
+ assertTrue("work dir does not exist in case of jvm reuse", workDirExists);
+
+ // now try to delete the work dir and verify that there are no stale paths
+ JvmManager.deleteWorkDir(tracker, task);
+ }
+ tracker.removeJobFiles(task.getUser(), jobId.toString());
+
+ assertTrue("Some task files are not deleted!! Number of stale paths is "
+ + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+ }
+
+ /**
+ * Validates if task cleanup is done properly for a succeeded task
* @throws IOException
*/
public void testTaskCleanup()
throws IOException {
+ if (!canRun()) {
+ return;
+ }
+ testTaskCleanup(false, false);// no needCleanup; no jvmReuse
+ }
+ /**
+ * Validates if task cleanup is done properly for a task that is not succeeded
+ * @throws IOException
+ */
+ public void testFailedTaskCleanup()
+ throws IOException {
+ if (!canRun()) {
+ return;
+ }
+ testTaskCleanup(true, false);// needCleanup; no jvmReuse
+ }
+
+ /**
+ * Validates if task cleanup is done properly for a succeeded task
+ * @throws IOException
+ */
+ public void testTaskCleanupWithJvmUse()
+ throws IOException {
+ if (!canRun()) {
+ return;
+ }
+ testTaskCleanup(false, true);// no needCleanup; jvmReuse
+ }
+
+ /**
+ * Validates if task cleanup is done properly
+ */
+ private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+ throws IOException {
// Localize job and localize task.
tracker.getLocalizer().initializeUserDirs(task.getUser());
localizedJobConf = tracker.localizeJobFiles(task);
+ if (jvmReuse) {
+ localizedJobConf.setNumTasksToExecutePerJvm(2);
+ }
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir
JobInitializationContext jobContext = new JobInitializationContext();
@@ -585,18 +673,9 @@ public class TestTaskTrackerLocalization
// TODO: Let the task run and create files.
- InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
- tracker.directoryCleanupThread = cleanupQueue;
-
- // ////////// The central methods being tested
- tip.removeTaskFiles(true, taskId);
- tracker.removeJobFiles(task.getUser(), jobId.toString());
- // //////////
-
- // TODO: make sure that all files intended to be deleted are deleted.
-
- assertTrue("Some task files are not deleted!! Number of stale paths is "
- + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+ // create files and set permissions 555. Verify if task controller sets
+ // the permissions for TT to delete the task dir or work dir properly
+ validateRemoveFiles(needCleanup, jvmReuse, tip);
// Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
// there.
@@ -604,7 +683,7 @@ public class TestTaskTrackerLocalization
Path userDir =
new Path(localDir, TaskTracker.getUserDir(task.getUser()));
assertTrue("User directory " + userDir + " is not present!!",
- tracker.localFs.exists(userDir));
+ tracker.getLocalFileSystem().exists(userDir));
}
// Test userlogs cleanup.
@@ -624,7 +703,7 @@ public class TestTaskTrackerLocalization
// Logs should be there before cleanup.
assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
- tracker.localFs.exists(logDir));
+ tracker.getLocalFileSystem().exists(logDir));
// ////////// Another being tested
TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
@@ -633,6 +712,6 @@ public class TestTaskTrackerLocalization
// Logs should be gone after cleanup.
assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
- tracker.localFs.exists(logDir));
+ tracker.getLocalFileSystem().exists(logDir));
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 03:43:43 2011
@@ -19,9 +19,11 @@
package org.apache.hadoop.mapred;
import java.text.DecimalFormat;
+import java.util.ArrayList;
import java.io.*;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import java.util.Enumeration;
import java.util.Properties;
@@ -46,6 +48,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.StringUtils;
/**
* Utilities used in unit test.
@@ -426,7 +429,37 @@ public class UtilsForTests {
}
}
}
-
+
+ /**
+ * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
+ * asynchronously.
+ */
+ public static class InlineCleanupQueue extends CleanupQueue {
+ List<String> stalePaths = new ArrayList<String>();
+
+ public InlineCleanupQueue() {
+ // do nothing
+ }
+
+ @Override
+ public void addToQueue(PathDeletionContext... contexts) {
+ // delete paths in-line
+ for (PathDeletionContext context : contexts) {
+ try {
+ if (!deletePath(context)) {
+ LOG.warn("Stale path " + context.fullPath);
+ stalePaths.add(context.fullPath);
+ }
+ } catch (IOException e) {
+ LOG.warn("Caught exception while deleting path "
+ + context.fullPath);
+ LOG.info(StringUtils.stringifyException(e));
+ stalePaths.add(context.fullPath);
+ }
+ }
+ }
+ }
+
static String getTaskSignalParameter(boolean isMap) {
return isMap
? "test.mapred.map.waiting.target"
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java Fri Mar 4 03:43:43 2011
@@ -58,7 +58,10 @@ public class TestServiceLevelAuthorizati
JobConf mrConf = new JobConf(conf);
mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1,
null, null, mrConf);
-
+ // make cleanup inline sothat validation of existence of these directories
+ // can be done
+ mr.setInlineCleanupThreads();
+
// Run examples
TestMiniMRWithDFS.runPI(mr, mr.createJobConf(mrConf));
TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf(mrConf));