You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/04/23 00:56:59 UTC
svn commit: r1096075 - in /hadoop/common/branches/branch-0.20-security: ./
src/c++/task-controller/impl/ src/c++/task-controller/test/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/filecache/
src/test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Apr 22 22:56:58 2011
New Revision: 1096075
URL: http://svn.apache.org/viewvc?rev=1096075&view=rev
Log:
MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing
itself. (Ravi Gummadi and Jagane Sundar via omalley)
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Apr 22 22:56:58 2011
@@ -60,6 +60,9 @@ Release 0.20.204.0 - unreleased
HDFS-1767. Namenode ignores non-initial block report from datanodes
when in safemode during startup. (Matt Foley via suresh)
+ MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing
+ itself. (Ravi Gummadi and Jagane Sundar via omalley)
+
Release 0.20.203.0 - unreleased
HADOOP-7190. Add metrics v1 back for backwards compatibility. (omalley)
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/configuration.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c Fri Apr 22 22:56:58 2011
@@ -241,8 +241,9 @@ void read_config(const char* file_name)
/*
* function used to get a configuration value.
* The function for the first time populates the configuration details into
- * array, next time onwards used the populated array.
+ * array, next time onwards uses the populated array.
*
+ * Memory returned here should be freed using free.
*/
char * get_value(const char* key) {
int count;
@@ -259,8 +260,15 @@ char * get_value(const char* key) {
* Value delimiter is assumed to be a comma.
*/
char ** get_values(const char * key) {
- char ** toPass = NULL;
char *value = get_value(key);
+ return extract_values(value);
+}
+
+/**
+ * Extracts array of values from the comma separated list of values.
+ */
+char ** extract_values(char * value) {
+ char ** toPass = NULL;
char *tempTok = NULL;
char *tempstr = NULL;
int size = 0;
@@ -286,9 +294,13 @@ char ** get_values(const char * key) {
return toPass;
}
-// free an entry set of values
+/**
+ * Free an entry set of values.
+ */
void free_values(char** values) {
if (*values != NULL) {
+ // the values were tokenized from the same malloc, so freeing the first
+ // frees the entire block.
free(*values);
}
if (values != NULL) {
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/configuration.h?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h Fri Apr 22 22:56:58 2011
@@ -34,6 +34,9 @@ char *get_value(const char* key);
//comma seperated strings.
char ** get_values(const char* key);
+// Extracts array of values from the comma separated list of values.
+char ** extract_values(char * value);
+
// free the memory returned by get_values
void free_values(char** values);
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/main.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c Fri Apr 22 22:56:58 2011
@@ -35,7 +35,9 @@
void display_usage(FILE *stream) {
fprintf(stream,
- "Usage: task-controller user command command-args\n");
+ "Usage: task-controller user good-local-dirs command command-args\n");
+ fprintf(stream, " where good-local-dirs is a comma separated list of " \
+ "good mapred local directories.\n");
fprintf(stream, "Commands:\n");
fprintf(stream, " initialize job: %2d jobid credentials cmd args\n",
INITIALIZE_JOB);
@@ -53,13 +55,14 @@ void display_usage(FILE *stream) {
int main(int argc, char **argv) {
//Minimum number of arguments required to run the task-controller
- if (argc < 4) {
+ if (argc < 5) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
LOGFILE = stdout;
int command;
+ const char * good_local_dirs = NULL;
const char * job_id = NULL;
const char * task_id = NULL;
const char * cred_file = NULL;
@@ -124,41 +127,46 @@ int main(int argc, char **argv) {
}
optind = optind + 1;
+ good_local_dirs = argv[optind];
+ if (good_local_dirs == NULL) {
+ return INVALID_TT_ROOT;
+ }
+
+ optind = optind + 1;
command = atoi(argv[optind++]);
fprintf(LOGFILE, "main : command provided %d\n",command);
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
+ fprintf(LOGFILE, "Good mapred-local-dirs are %s\n", good_local_dirs);
switch (command) {
case INITIALIZE_JOB:
- if (argc < 7) {
- fprintf(LOGFILE, "Too few arguments (%d vs 7) for initialize job\n",
- argc);
+ if (argc < 8) {
+ fprintf(LOGFILE, "Too few arguments (%d vs 8) for initialize job\n",
+ argc);
return INVALID_ARGUMENT_NUMBER;
}
job_id = argv[optind++];
cred_file = argv[optind++];
job_xml = argv[optind++];
- exit_code = initialize_job(user_detail->pw_name, job_id, cred_file,
- job_xml, argv + optind);
+ exit_code = initialize_job(user_detail->pw_name, good_local_dirs, job_id,
+ cred_file, job_xml, argv + optind);
break;
case LAUNCH_TASK_JVM:
- if (argc < 7) {
- fprintf(LOGFILE, "Too few arguments (%d vs 7) for launch task\n",
- argc);
+ if (argc < 8) {
+ fprintf(LOGFILE, "Too few arguments (%d vs 8) for launch task\n", argc);
return INVALID_ARGUMENT_NUMBER;
}
job_id = argv[optind++];
task_id = argv[optind++];
current_dir = argv[optind++];
script_file = argv[optind++];
- exit_code = run_task_as_user(user_detail->pw_name, job_id, task_id,
- current_dir, script_file);
+ exit_code = run_task_as_user(user_detail->pw_name, good_local_dirs, job_id,
+ task_id, current_dir, script_file);
break;
case SIGNAL_TASK:
- if (argc < 5) {
- fprintf(LOGFILE, "Too few arguments (%d vs 5) for signal task\n",
- argc);
+ if (argc < 6) {
+ fprintf(LOGFILE, "Too few arguments (%d vs 6) for signal task\n", argc);
return INVALID_ARGUMENT_NUMBER;
} else {
char* end_ptr = NULL;
@@ -179,7 +187,8 @@ int main(int argc, char **argv) {
break;
case DELETE_AS_USER:
dir_to_be_deleted = argv[optind++];
- exit_code= delete_as_user(user_detail->pw_name, dir_to_be_deleted);
+ exit_code= delete_as_user(user_detail->pw_name, good_local_dirs,
+ dir_to_be_deleted);
break;
case DELETE_LOG_AS_USER:
dir_to_be_deleted = argv[optind++];
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c Fri Apr 22 22:56:58 2011
@@ -42,8 +42,6 @@
#define TT_LOCAL_TASK_DIR_PATTERN "%s/taskTracker/%s/jobcache/%s/%s"
-#define TT_SYS_DIR_KEY "mapred.local.dir"
-
#define TT_LOG_DIR_KEY "hadoop.log.dir"
#define JOB_FILENAME "job.xml"
@@ -210,6 +208,14 @@ int change_user(uid_t user, gid_t group)
}
/**
+ * Get the array of mapred local dirs from the given comma separated list of
+ * paths. Memory allocated using strdup here is freed up in free_values().
+ */
+char ** get_mapred_local_dirs(const char * good_local_dirs) {
+ return extract_values(strdup(good_local_dirs));
+}
+
+/**
* Utility function to concatenate argB to argA using the concat_pattern.
*/
char *concatenate(char *concat_pattern, char *return_path_name,
@@ -353,8 +359,8 @@ int mkdirs(const char* path, mode_t perm
* Function to prepare the attempt directories for the task JVM.
* It creates the task work and log directories.
*/
-static int create_attempt_directories(const char* user, const char *job_id,
- const char *task_id) {
+static int create_attempt_directories(const char* user,
+ const char * good_local_dirs, const char *job_id, const char *task_id) {
// create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (job_id == NULL || task_id == NULL || user == NULL) {
@@ -364,11 +370,11 @@ static int create_attempt_directories(co
}
int result = 0;
- char **local_dir = get_values(TT_SYS_DIR_KEY);
+ char **local_dir = get_mapred_local_dirs(good_local_dirs);
if (local_dir == NULL) {
- fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
- return -1;
+ fprintf(LOGFILE, "Good mapred local directories could not be obtained.\n");
+ return INVALID_TT_ROOT;
}
char **local_dir_ptr;
@@ -635,10 +641,10 @@ static int copy_file(int input, const ch
/**
* Function to initialize the user directories of a user.
*/
-int initialize_user(const char *user) {
- char **local_dir = get_values(TT_SYS_DIR_KEY);
+int initialize_user(const char *user, const char * good_local_dirs) {
+ char **local_dir = get_mapred_local_dirs(good_local_dirs);
if (local_dir == NULL) {
- fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ fprintf(LOGFILE, "Good mapred local directories could ot be obtained.\n");
return INVALID_TT_ROOT;
}
@@ -664,16 +670,16 @@ int initialize_user(const char *user) {
/**
* Function to prepare the job directories for the task JVM.
*/
-int initialize_job(const char *user, const char *jobid,
- const char* credentials, const char* job_xml,
- char* const* args) {
+int initialize_job(const char *user, const char * good_local_dirs,
+ const char *jobid, const char* credentials, const char* job_xml,
+ char* const* args) {
if (jobid == NULL || user == NULL) {
fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n");
return INVALID_ARGUMENT_NUMBER;
}
// create the user directory
- int result = initialize_user(user);
+ int result = initialize_user(user, good_local_dirs);
if (result != 0) {
return result;
}
@@ -707,10 +713,10 @@ int initialize_job(const char *user, con
// 750
mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP;
- char **tt_roots = get_values(TT_SYS_DIR_KEY);
+ char **tt_roots = get_mapred_local_dirs(good_local_dirs);
if (tt_roots == NULL) {
- return INVALID_CONFIG_FILE;
+ return INVALID_TT_ROOT;
}
char **tt_root;
@@ -771,12 +777,12 @@ int initialize_job(const char *user, con
* 4) Does an execlp on the same in order to replace the current image with
* task image.
*/
-int run_task_as_user(const char *user, const char *job_id,
- const char *task_id, const char *work_dir,
- const char *script_name) {
+int run_task_as_user(const char *user, const char * good_local_dirs,
+ const char *job_id, const char *task_id,
+ const char *work_dir, const char *script_name) {
int exit_code = -1;
char *task_script_path = NULL;
- if (create_attempt_directories(user, job_id, task_id) != 0) {
+ if (create_attempt_directories(user, good_local_dirs, job_id, task_id) != 0) {
goto cleanup;
}
int task_file_source = open_file_as_task_tracker(script_name);
@@ -1002,15 +1008,15 @@ static int delete_path(const char *full_
* user: the user doing the delete
* subdir: the subdir to delete
*/
-int delete_as_user(const char *user,
+int delete_as_user(const char *user, const char * good_local_dirs,
const char *subdir) {
int ret = 0;
- char** tt_roots = get_values(TT_SYS_DIR_KEY);
+ char** tt_roots = get_mapred_local_dirs(good_local_dirs);
char** ptr;
if (tt_roots == NULL || *tt_roots == NULL) {
- fprintf(LOGFILE, "No %s defined in the configuration\n", TT_SYS_DIR_KEY);
- return INVALID_CONFIG_FILE;
+ fprintf(LOGFILE, "Good mapred local directories could ot be obtained.\n");
+ return INVALID_TT_ROOT;
}
// do the delete
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.h?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h Fri Apr 22 22:56:58 2011
@@ -74,19 +74,20 @@ int check_taskcontroller_permissions(cha
int delete_log_directory(const char *log_dir);
// initialize the job directory
-int initialize_job(const char *user, const char *jobid,
+int initialize_job(const char *user, const char * good_local_dirs, const char *jobid,
const char *credentials,
const char *job_xml, char* const* args);
// run the task as the user
-int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+int run_task_as_user(const char * user, const char * good_local_dirs,
+ const char *jobid, const char *taskid,
const char *work_dir, const char *script_name);
// send a signal as the user
int signal_user_task(const char *user, int pid, int sig);
// delete a directory (or file) recursively as the user.
-int delete_as_user(const char *user,
+int delete_as_user(const char *user, const char * good_local_dirs,
const char *dir_to_be_deleted);
// run a command as the user
@@ -140,7 +141,7 @@ int mkdirs(const char* path, mode_t perm
/**
* Function to initialize the user directories of a user.
*/
-int initialize_user(const char *user);
+int initialize_user(const char *user, const char * good_local_dirs);
/**
* Create a top level directory for the user.
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c Fri Apr 22 22:56:58 2011
@@ -637,7 +637,7 @@ void test_run_task() {
strerror(errno));
exit(1);
} else if (child == 0) {
- if (run_task_as_user(username, "job_4", "task_1",
+ if (run_task_as_user(username, "", "job_4", "task_1",
task_dir, script_name) != 0) {
printf("FAIL: failed in child\n");
exit(42);
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Apr 22 22:56:58 2011
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -32,9 +31,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.util.ProcessTree.Signal;
import org.apache.hadoop.util.ProcessTree;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.commons.logging.Log;
@@ -249,8 +248,9 @@ public class DefaultTaskController exten
}
@Override
- public void setup(LocalDirAllocator allocator) {
+ public void setup(LocalDirAllocator allocator, LocalStorage l) {
this.allocator = allocator;
+ this.localStorage = l;
}
}
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +33,7 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.util.ProcessTree.Signal;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -48,8 +48,10 @@ import org.apache.hadoop.util.StringUtil
* JVM and killing it when needed, and also initializing and
* finalizing the task environment.
* <p> The setuid executable is launched using the command line:</p>
- * <p>task-controller user-name command command-args, where</p>
+ * <p>task-controller user-name good-local-dirs command command-args,
+ * where</p>
* <p>user-name is the name of the owner who submits the job</p>
+ * <p>good-local-dirs is comma separated list of good mapred local dirs</p>
* <p>command is one of the cardinal value of the
* {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
* <p>command-args depends on the command being launched.</p>
@@ -122,7 +124,8 @@ class LinuxTaskController extends TaskCo
}
@Override
- public void setup(LocalDirAllocator allocator) throws IOException {
+ public void setup(LocalDirAllocator allocator, LocalStorage localStorage)
+ throws IOException {
// Check the permissions of the task-controller binary by running it plainly.
// If permissions are correct, it returns an error code 1, else it returns
@@ -142,8 +145,8 @@ class LinuxTaskController extends TaskCo
}
}
this.allocator = allocator;
+ this.localStorage = localStorage;
}
-
@Override
public void initializeJob(String user, String jobid, Path credentials,
@@ -152,7 +155,8 @@ class LinuxTaskController extends TaskCo
) throws IOException {
List<String> command = new ArrayList<String>(
Arrays.asList(taskControllerExe,
- user,
+ user,
+ localStorage.getGoodLocalDirsString(),
Integer.toString(Commands.INITIALIZE_JOB.getValue()),
jobid,
credentials.toUri().getPath().toString(),
@@ -216,6 +220,7 @@ class LinuxTaskController extends TaskCo
String[] command =
new String[]{taskControllerExe,
user,
+ localStorage.getGoodLocalDirsString(),
Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
jobId,
attemptId,
@@ -256,6 +261,7 @@ class LinuxTaskController extends TaskCo
String[] command =
new String[]{taskControllerExe,
user,
+ localStorage.getGoodLocalDirsString(),
Integer.toString(Commands.DELETE_AS_USER.getValue()),
subDir};
ShellCommandExecutor shExec = new ShellCommandExecutor(command);
@@ -270,6 +276,7 @@ class LinuxTaskController extends TaskCo
String[] command =
new String[]{taskControllerExe,
user,
+ localStorage.getGoodLocalDirsString(),
Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
subDir};
ShellCommandExecutor shExec = new ShellCommandExecutor(command);
@@ -285,6 +292,7 @@ class LinuxTaskController extends TaskCo
String[] command =
new String[]{taskControllerExe,
user,
+ localStorage.getGoodLocalDirsString(),
Integer.toString(Commands.SIGNAL_TASK.getValue()),
Integer.toString(taskPid),
Integer.toString(signal.getValue())};
@@ -316,9 +324,10 @@ class LinuxTaskController extends TaskCo
Task firstTask = allAttempts.get(0);
String taskid = firstTask.getTaskID().toString();
- LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
+ LocalDirAllocator ldirAlloc =
+ new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
String taskRanFile = TaskTracker.TT_LOG_TMP_DIR + Path.SEPARATOR + taskid;
- Configuration conf = new Configuration();
+ Configuration conf = getConf();
//write the serialized task information to a file to pass to the truncater
Path taskRanFilePath =
@@ -348,12 +357,13 @@ class LinuxTaskController extends TaskCo
command.add(TaskLogsTruncater.class.getName());
command.add(taskRanFilePath.toString());
- String[] taskControllerCmd = new String[3 + command.size()];
+ String[] taskControllerCmd = new String[4 + command.size()];
taskControllerCmd[0] = taskControllerExe;
taskControllerCmd[1] = user;
- taskControllerCmd[2] = Integer.toString(
+ taskControllerCmd[2] = localStorage.getGoodLocalDirsString();
+ taskControllerCmd[3] = Integer.toString(
Commands.RUN_COMMAND_AS_USER.getValue());
- int i = 3;
+ int i = 4;
for (String cmdArg : command) {
taskControllerCmd[i++] = cmdArg;
}
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java Fri Apr 22 22:56:58 2011
@@ -29,6 +29,12 @@ interface MRConstants {
public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
+ /**
+ * How often TaskTracker needs to check the health of its disks, if not
+ * configured using mapred.disk.healthChecker.interval
+ */
+ public static final long DEFAULT_DISK_HEALTH_CHECK_INTERVAL = 60 * 1000;
+
//
// Result codes
//
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Apr 22 22:56:58 2011
@@ -17,14 +17,11 @@
*/
package org.apache.hadoop.mapred;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,10 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.util.ProcessTree.Signal;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
* Controls initialization, finalization and clean up of tasks, and
@@ -64,6 +59,11 @@ public abstract class TaskController imp
protected LocalDirAllocator allocator;
+ /**
+ * LocalStorage of TaskTracker
+ */
+ protected LocalStorage localStorage;
+
final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
@@ -78,8 +78,10 @@ public abstract class TaskController imp
/**
* Does initialization and setup.
* @param allocator the local dir allocator to use
+ * @param l TaskTracker's LocalStorage object
*/
- public abstract void setup(LocalDirAllocator allocator) throws IOException;
+ public abstract void setup(LocalDirAllocator allocator,
+ LocalStorage l) throws IOException;
/**
* Create all of the directories necessary for the job to start and download
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Apr 22 22:56:58 2011
@@ -166,8 +166,96 @@ public class TaskTracker implements MRCo
volatile boolean running = true;
+ /**
+ * TaskTracker internal class only.
+ * Manages the lists of good mapred local dirs and bad mapred local dirs.
+ */
+ public static class LocalStorage {
+ private List<String> goodLocalDirs = new ArrayList<String>();
+ private List<String> badLocalDirs = new ArrayList<String>();
+ private boolean diskFailed = false;
+
+ /**
+ * TaskTracker internal only
+ */
+ public LocalStorage(String[] localDirs) {
+ for (String s : localDirs) {
+ goodLocalDirs.add(s);
+ }
+ }
+
+ /**
+ * @return good mapred local dirs list
+ */
+ synchronized String[] getGoodLocalDirs() {
+ String[] rv = new String[goodLocalDirs.size()];
+ return goodLocalDirs.toArray(rv);
+ }
+
+ /**
+ * @return good mapred local dirs list as a commma seperated string
+ */
+ synchronized String getGoodLocalDirsString() {
+ StringBuffer sb = new StringBuffer();
+ for (String s : goodLocalDirs) {
+ if (sb.length() > 0) {
+ sb.append(",");
+ }
+ sb.append(s);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * @return bad mapred local dirs list
+ */
+ synchronized String[] getBadLocalDirs() {
+ String[] rv = new String[badLocalDirs.size()];
+ return badLocalDirs.toArray(rv);
+ }
+
+ /**
+ * @return true if a disk has failed since the last
+ * time this method was called
+ */
+ synchronized boolean isDiskFailed() {
+ boolean rv = diskFailed;
+ diskFailed = false;
+ return rv;
+ }
+
+ /**
+ * Check if the given local directories
+ * (and parent directories, if necessary) can be created.
+ * Updates the list of good mapred local dirs and the list of bad local
+ * dirs.
+ * @throws DiskErrorException if all local directories are not writable
+ */
+ synchronized void checkLocalDirs()
+ throws DiskErrorException {
+ for (String s : getGoodLocalDirs()) {
+ try {
+ DiskChecker.checkDir(new File(s));
+ } catch(DiskErrorException e) {
+ LOG.warn("Task Tracker localdir error " + e.getMessage()
+ + ", removing from good locadirs");
+ goodLocalDirs.remove(s);
+ badLocalDirs.add(s);
+ diskFailed = true;
+ }
+ }
+
+ // no good local dirs ?
+ if(goodLocalDirs.size() < 1) {
+ throw new DiskErrorException(
+ "All mapred local directories are not writable.");
+ }
+ }
+ }
+
+ private LocalStorage localStorage;
+ private long lastCheckDirsTime;
private LocalDirAllocator localDirAllocator;
- private String[] localdirs;
String taskTrackerName;
String localHostname;
InetSocketAddress jobTrackAddr;
@@ -315,6 +403,19 @@ public class TaskTracker implements MRCo
*/
private NodeHealthCheckerService healthChecker;
+ /**
+ * Configuration property for disk health check interval in milli seconds.
+ * Currently, configuring this to a value smaller than the heartbeat interval
+ * is equivalent to setting this to heartbeat interval value.
+ */
+ static final String DISK_HEALTH_CHECK_INTERVAL_PROPERTY =
+ "mapred.disk.healthChecker.interval";
+ /**
+ * How often TaskTracker needs to check the health of its disks.
+ * Default value is {@link MRConstants#DEFAULT_DISK_HEALTH_CHECK_INTERVAL}
+ */
+ private long diskHealthCheckInterval;
+
/*
* A list of commitTaskActions for whom commit response has been received
*/
@@ -579,7 +680,7 @@ public class TaskTracker implements MRCo
* @throws IOException
*/
private void deleteUserDirectories(Configuration conf) throws IOException {
- for(String root: localdirs) {
+ for(String root: localStorage.getGoodLocalDirs()) {
for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
String owner = status.getOwner();
String path = status.getPath().getName();
@@ -614,23 +715,34 @@ public class TaskTracker implements MRCo
(fConf.get("mapred.tasktracker.dns.interface","default"),
fConf.get("mapred.tasktracker.dns.nameserver","default"));
}
-
- //check local disk
- checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+
+ localStorage.checkLocalDirs();
+ if (localStorage.isDiskFailed()) {
+ // Ignore current disk failures. They are being handled now.
+ }
+ String dirs = localStorage.getGoodLocalDirsString();
+ fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
+ LOG.info("Good mapred local directories are: " + dirs);
+ taskController.setConf(fConf);
+ // Setup task controller so that deletion of user dirs happens properly
+ taskController.setup(localDirAllocator, localStorage);
+ server.setAttribute("conf", fConf);
+
deleteUserDirectories(fConf);
+
fConf.deleteLocalFiles(SUBDIR);
final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
- for (String s : localdirs) {
+ for (String s : localStorage.getGoodLocalDirs()) {
localFs.mkdirs(new Path(s, SUBDIR), ttdir);
}
fConf.deleteLocalFiles(TT_PRIVATE_DIR);
final FsPermission priv = FsPermission.createImmutable((short) 0700);
- for (String s : localdirs) {
+ for (String s : localStorage.getGoodLocalDirs()) {
localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
}
fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
final FsPermission pub = FsPermission.createImmutable((short) 0755);
- for (String s : localdirs) {
+ for (String s : localStorage.getGoodLocalDirs()) {
localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
}
@@ -726,7 +838,7 @@ public class TaskTracker implements MRCo
reduceLauncher.start();
// create a localizer instance
- setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
+ setLocalizer(new Localizer(localFs, localStorage.getGoodLocalDirs()));
//Start up node health checker service.
if (shouldStartHealthMonitor(this.fConf)) {
@@ -737,6 +849,13 @@ public class TaskTracker implements MRCo
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
}
+ /**
+ * @return TaskTracker's LocalStorage object
+ */
+ LocalStorage getLocalStorage() {
+ return localStorage;
+ }
+
private void createInstrumentation() {
Class<? extends TaskTrackerInstrumentation> metricsInst =
getInstrumentationClass(fConf);
@@ -1187,6 +1306,8 @@ public class TaskTracker implements MRCo
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
RunningJob rjob) throws IOException {
synchronized (tip) {
+ jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ localStorage.getGoodLocalDirsString());
tip.setJobConf(jobConf);
tip.setUGI(rjob.ugi);
tip.launchTask(rjob);
@@ -1285,6 +1406,8 @@ public class TaskTracker implements MRCo
"mapred.tasktracker.map.tasks.maximum", 2);
maxReduceSlots = conf.getInt(
"mapred.tasktracker.reduce.tasks.maximum", 2);
+ diskHealthCheckInterval = conf.getLong(DISK_HEALTH_CHECK_INTERVAL_PROPERTY,
+ DEFAULT_DISK_HEALTH_CHECK_INTERVAL);
UserGroupInformation.setConfiguration(originalConf);
aclsManager = new ACLsManager(conf, new JobACLsManager(conf), null);
this.jobTrackAddr = JobTracker.getAddress(conf);
@@ -1307,9 +1430,13 @@ public class TaskTracker implements MRCo
Class<? extends TaskController> taskControllerClass =
conf.getClass("mapred.task.tracker.task-controller",
DefaultTaskController.class, TaskController.class);
- taskController =
- (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
- taskController.setup(localDirAllocator);
+
+ fConf = new JobConf(conf);
+ localStorage = new LocalStorage(fConf.getLocalDirs());
+ localStorage.checkLocalDirs();
+ taskController =
+ (TaskController) ReflectionUtils.newInstance(taskControllerClass, fConf);
+ taskController.setup(localDirAllocator, localStorage);
// create user log manager
setUserLogManager(new UserLogManager(conf, taskController));
@@ -1319,7 +1446,7 @@ public class TaskTracker implements MRCo
this.shuffleServerMetrics = ShuffleServerInstrumentation.create(this);
server.setAttribute("task.tracker", this);
server.setAttribute("local.file.system", local);
- server.setAttribute("conf", conf);
+
server.setAttribute("log", LOG);
server.setAttribute("localDirAllocator", localDirAllocator);
server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
@@ -1448,7 +1575,17 @@ public class TaskTracker implements MRCo
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(fConf);
}
-
+
+ now = System.currentTimeMillis();
+ if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
+ localStorage.checkLocalDirs();
+ lastCheckDirsTime = now;
+ // If any of the good disks failed, re-init the task tracker
+ if (localStorage.isDiskFailed()) {
+ return State.STALE;
+ }
+ }
+
// Send the heartbeat and process the jobtracker's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
@@ -1456,7 +1593,6 @@ public class TaskTracker implements MRCo
// next heartbeat
lastHeartbeat = System.currentTimeMillis();
-
// Check if the map-event list needs purging
Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
if (jobs.size() > 0) {
@@ -1615,7 +1751,6 @@ public class TaskTracker implements MRCo
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
- checkLocalDirs(fConf.getLocalDirs());
askForNewTask = enoughFreeSpace(localMinSpaceStart);
long freeDiskSpace = getFreeSpace();
long totVmem = getTotalVirtualMemoryOnTT();
@@ -1832,7 +1967,7 @@ public class TaskTracker implements MRCo
jobDir.substring(userDir.length()));
directoryCleanupThread.addToQueue(jobCleanup);
- for (String str : localdirs) {
+ for (String str : localStorage.getGoodLocalDirs()) {
Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
PathDeletionContext ttPrivateJobCleanup =
@@ -1954,7 +2089,7 @@ public class TaskTracker implements MRCo
private long getFreeSpace() throws IOException {
long biggestSeenSoFar = 0;
- String[] localDirs = fConf.getLocalDirs();
+ String[] localDirs = localStorage.getGoodLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
DF df = null;
if (localDirsDf.containsKey(localDirs[i])) {
@@ -3413,32 +3548,6 @@ public class TaskTracker implements MRCo
}
/**
- * Check if the given local directories
- * (and parent directories, if necessary) can be created.
- * @param localDirs where the new TaskTracker should keep its local files.
- * @throws DiskErrorException if all local directories are not writable
- */
- private static void checkLocalDirs(String[] localDirs)
- throws DiskErrorException {
- boolean writable = false;
-
- if (localDirs != null) {
- for (int i = 0; i < localDirs.length; i++) {
- try {
- DiskChecker.checkDir(new File(localDirs[i]));
- writable = true;
- } catch(DiskErrorException e) {
- LOG.warn("Task Tracker local " + e.getMessage());
- }
- }
- }
-
- if (!writable)
- throw new DiskErrorException(
- "all local directories are not writable");
- }
-
- /**
* Is this task tracker idle?
* @return has this task tracker finished and cleaned up all of its tasks?
*/
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Apr 22 22:56:58 2011
@@ -35,10 +35,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.TaskDistributedCacheManager.CacheFile;
import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobLocalizer;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
@@ -111,7 +113,8 @@ public class TestTrackerDistributedCache
taskControllerClass, conf);
// setup permissions for mapred local dir
- taskController.setup(localDirAllocator);
+ taskController.setup(localDirAllocator,
+ new LocalStorage(conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
// Create the temporary cache files to be used in the tests.
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -126,7 +129,8 @@ public class TestTrackerDistributedCache
protected void refreshConf(Configuration conf) throws IOException {
taskController.setConf(conf);
- taskController.setup(localDirAllocator);
+ taskController.setup(localDirAllocator,
+ new LocalStorage(conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.security.UserGroupInformation;
import junit.framework.TestCase;
@@ -75,14 +76,15 @@ public class ClusterWithLinuxTaskControl
+ "/task-controller";
@Override
- public void setup(LocalDirAllocator allocator) throws IOException {
+ public void setup(LocalDirAllocator allocator, LocalStorage l)
+ throws IOException {
// get the current ugi and set the task controller group owner
getConf().set(TT_GROUP, taskTrackerSpecialGroup);
// write configuration file
configurationFile = createTaskControllerConf(System
.getProperty(TASKCONTROLLER_PATH), getConf());
- super.setup(allocator);
+ super.setup(allocator, l);
}
protected String getTaskControllerExecutablePath() {
@@ -209,8 +211,8 @@ public class ClusterWithLinuxTaskControl
PrintWriter writer =
new PrintWriter(new FileOutputStream(configurationFile));
- writer.println(String.format("mapred.local.dir=%s", conf.
- get(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
+ //writer.println(String.format("mapred.local.dir=%s", conf.
+ // get(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
writer
.println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java Fri Apr 22 22:56:58 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
@@ -75,8 +76,9 @@ public class TestJvmManager {
tt.setTaskController((dtc = new DefaultTaskController()));
Configuration conf = new Configuration();
dtc.setConf(conf);
- LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
- tt.getTaskController().setup(ldirAlloc);
+ LocalDirAllocator ldirAlloc =
+ new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+ tt.getTaskController().setup(ldirAlloc, new LocalStorage(ttConf.getLocalDirs()));
JobID jobId = new JobID("test", 0);
jvmManager = new JvmManager(tt);
tt.setJvmManagerInstance(jvmManager);
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import junit.framework.TestCase;
@@ -51,7 +52,10 @@ public class TestLinuxTaskController ext
// task controller setup should fail validating permissions.
Throwable th = null;
try {
- controller.setup(new LocalDirAllocator("mapred.local.dir"));
+ controller.setup(
+ new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+ new LocalStorage(controller.getConf().getStrings(
+ JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
} catch (IOException ie) {
th = ie;
}
@@ -60,7 +64,9 @@ public class TestLinuxTaskController ext
+ INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
"with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
} else {
- controller.setup(new LocalDirAllocator("mapred.local.dir"));
+ controller.setup(new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+ new LocalStorage(controller.getConf().getStrings(
+ JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
}
}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Apr 22 22:56:58 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
@@ -187,7 +188,8 @@ public class TestTaskTrackerLocalization
// setup task controller
taskController = getTaskController();
taskController.setConf(trackerFConf);
- taskController.setup(lDirAlloc);
+ taskController.setup(lDirAlloc,
+ new LocalStorage(trackerFConf.getLocalDirs()));
tracker.setTaskController(taskController);
tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(),localDirs));
}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
@@ -65,7 +66,9 @@ public class TestTrackerDistributedCache
String execPath = path + "/task-controller";
((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
taskController.setConf(conf);
- taskController.setup(new LocalDirAllocator("mapred.local.dir"));
+ taskController.setup(
+ new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+ new LocalStorage(conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
}
@Override