You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/04/23 01:13:23 UTC
svn commit: r1096081 - in /hadoop/common/branches/branch-0.20-security: ./
src/c++/task-controller/impl/ src/c++/task-controller/test/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Apr 22 23:13:23 2011
New Revision: 1096081
URL: http://svn.apache.org/viewvc?rev=1096081&view=rev
Log:
MAPREDUCE-2415. Distribute the user task logs on to multiple disks.
(Bharath Mundlapudi via omalley)
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Apr 22 23:13:23 2011
@@ -61,15 +61,18 @@ Release 0.20.204.0 - unreleased
IMPROVEMENTS
+ MAPREDUCE-2415. Distribute the user task logs on to multiple disks.
+ (Bharath Mundlapudi via omalley)
+
+ MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing
+ itself. (Ravi Gummadi and Jagane Sundar via omalley)
+
HDFS-1541. Not marking datanodes dead when namenode in safemode.
(hairong)
HDFS-1767. Namenode ignores non-initial block report from datanodes
when in safemode during startup. (Matt Foley via suresh)
- MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing
- itself. (Ravi Gummadi and Jagane Sundar via omalley)
-
Release 0.20.203.0 - unreleased
HADOOP-7190. Add metrics v1 back for backwards compatibility. (omalley)
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/main.c?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c Fri Apr 22 23:13:23 2011
@@ -192,7 +192,7 @@ int main(int argc, char **argv) {
break;
case DELETE_LOG_AS_USER:
dir_to_be_deleted = argv[optind++];
- exit_code= delete_log_directory(dir_to_be_deleted);
+ exit_code= delete_log_directory(dir_to_be_deleted, good_local_dirs);
break;
case RUN_COMMAND_AS_USER:
exit_code = run_command_as_user(user_detail->pw_name, argv + optind);
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.c?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c Fri Apr 22 23:13:23 2011
@@ -28,9 +28,11 @@
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
+#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
+#include <sys/time.h>
#define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -54,6 +56,8 @@ static const int DEFAULT_MIN_USERID = 10
#define BANNED_USERS_KEY "banned.users"
+#define USERLOGS "userlogs"
+
static const char* DEFAULT_BANNED_USERS[] = {"mapred", "hdfs", "bin", 0};
//struct to store the user details
@@ -355,11 +359,51 @@ int mkdirs(const char* path, mode_t perm
return 0;
}
+static short get_current_local_dir_count(char **local_dir)
+{
+ char **local_dir_ptr;
+ short count=0;
+
+ for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
+ ++count;
+ }
+ return count;
+}
+
+static char* get_nth_local_dir(char **local_dir, int nth)
+{
+ char **local_dir_ptr;
+ short count=0;
+
+ for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
+ if(count == nth) {
+ return strdup(*local_dir_ptr);
+ }
+ ++count;
+ }
+ fprintf(LOGFILE, "Invalid index %d for %d local directories\n", nth, count);
+ return NULL;
+}
+
+static char* get_random_local_dir(char **local_dir) {
+ struct timeval tv;
+ short nth;
+ gettimeofday(&tv, NULL);
+ srand ( (long) tv.tv_sec*1000000 + tv.tv_usec );
+ short cnt = get_current_local_dir_count(local_dir);
+ if(cnt == 0) {
+ fprintf(LOGFILE, "No valid local directories\n");
+ return NULL;
+ }
+ nth = rand() % cnt;
+ return get_nth_local_dir(local_dir, nth);
+}
+
/**
* Function to prepare the attempt directories for the task JVM.
* It creates the task work and log directories.
*/
-static int create_attempt_directories(const char* user,
+int create_attempt_directories(const char* user,
const char * good_local_dirs, const char *job_id, const char *task_id) {
// create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
@@ -392,24 +436,67 @@ static int create_attempt_directories(co
free(task_dir);
}
}
- free_values(local_dir);
// also make the directory for the task logs
char *job_task_name = malloc(strlen(job_id) + strlen(task_id) + 2);
+ char *real_task_dir = NULL; // target of symlink
+ char *real_job_dir = NULL; // parent dir of target of symlink
+ char *random_local_dir = NULL;
+ char *link_task_log_dir = NULL; // symlink
if (job_task_name == NULL) {
fprintf(LOGFILE, "Malloc of job task name failed\n");
result = -1;
} else {
sprintf(job_task_name, "%s/%s", job_id, task_id);
- char *log_dir = get_job_log_directory(job_task_name);
- free(job_task_name);
- if (log_dir == NULL) {
+ link_task_log_dir = get_job_log_directory(job_task_name);
+ random_local_dir = get_random_local_dir(local_dir);
+ if(random_local_dir == NULL) {
+ result = -1;
+ goto cleanup;
+ }
+ real_job_dir = malloc(strlen(random_local_dir) + strlen(USERLOGS) +
+ strlen(job_id) + 3);
+ if (real_job_dir == NULL) {
+ fprintf(LOGFILE, "Malloc of real job directory failed\n");
result = -1;
- } else if (mkdirs(log_dir, perms) != 0) {
+ goto cleanup;
+ }
+ real_task_dir = malloc(strlen(random_local_dir) + strlen(USERLOGS) +
+ strlen(job_id) + strlen(task_id) + 4);
+ if (real_task_dir == NULL) {
+ fprintf(LOGFILE, "Malloc of real task directory failed\n");
+ result = -1;
+ goto cleanup;
+ }
+ sprintf(real_job_dir, "%s/userlogs/%s", random_local_dir, job_id);
+ result = create_directory_for_user(real_job_dir);
+ if( result != 0) {
+ result = -1;
+ goto cleanup;
+ }
+ sprintf(real_task_dir, "%s/userlogs/%s/%s",
+ random_local_dir, job_id, task_id);
+ result = mkdirs(real_task_dir, perms);
+ if( result != 0) {
+ result = -1;
+ goto cleanup;
+ }
+ result = symlink(real_task_dir, link_task_log_dir);
+ if( result != 0) {
+ fprintf(LOGFILE, "Failed to create symlink %s to %s - %s\n",
+ link_task_log_dir, real_task_dir, strerror(errno));
result = -1;
}
- free(log_dir);
}
+
+ cleanup:
+ free(random_local_dir);
+ free(job_task_name);
+ free(link_task_log_dir);
+ free(real_job_dir);
+ free(real_task_dir);
+ free_values(local_dir);
+
return result;
}
@@ -523,7 +610,7 @@ static int change_owner(const char* path
/**
* Create a top level directory for the user.
* It assumes that the parent directory is *not* writable by the user.
- * It creates directories with 02700 permissions owned by the user
+ * It creates directories with 02750 permissions owned by the user
* and with the group set to the task tracker group.
* return non-0 on failure
*/
@@ -1036,17 +1123,38 @@ int delete_as_user(const char *user, con
return ret;
}
-/**
- * delete a given log directory
+/*
+ * delete a given job log directory
+ * This function takes jobid and deletes the related logs.
*/
-int delete_log_directory(const char *subdir) {
- char* log_subdir = get_job_log_directory(subdir);
+int delete_log_directory(const char *subdir, const char * good_local_dirs) {
+ char* job_log_dir = get_job_log_directory(subdir);
+
int ret = -1;
- if (log_subdir != NULL) {
- ret = delete_path(log_subdir, strchr(subdir, '/') == NULL);
+ if (job_log_dir == NULL) return ret;
+
+ //delete the job log directory in <hadoop.log.dir>/userlogs/jobid
+ delete_path(job_log_dir, true);
+
+ char **local_dir = get_mapred_local_dirs(good_local_dirs);
+
+ char **local_dir_ptr;
+ for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
+ char *mapred_local_log_dir = concatenate("%s/userlogs/%s",
+ "mapred local job log dir",
+ 2, *local_dir_ptr, subdir);
+ if (mapred_local_log_dir != NULL) {
+ //delete the job log directory in <mapred.local.dir>/userlogs/jobid
+ delete_path(mapred_local_log_dir, true);
+ free(mapred_local_log_dir);
+ }
+ else
+ fprintf(LOGFILE, "Failed to delete mapred local log dir for jobid %s\n",
+ subdir);
}
- free(log_subdir);
- return ret;
+ free(job_log_dir);
+ free_values(local_dir);
+ return 0;
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.h?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h Fri Apr 22 23:13:23 2011
@@ -71,7 +71,7 @@ int check_taskcontroller_permissions(cha
/**
* delete a given log directory as a user
*/
-int delete_log_directory(const char *log_dir);
+int delete_log_directory(const char *log_dir, const char * good_local_dirs);
// initialize the job directory
int initialize_job(const char *user, const char * good_local_dirs, const char *jobid,
@@ -153,3 +153,9 @@ int initialize_user(const char *user, co
int create_directory_for_user(const char* path);
int change_user(uid_t user, gid_t group);
+
+/**
+ * Create task attempt related directories as user.
+ */
+int create_attempt_directories(const char* user,
+ const char * good_local_dirs, const char *job_id, const char *task_id);
Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c Fri Apr 22 23:13:23 2011
@@ -24,9 +24,11 @@
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
+#include <stdbool.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/wait.h>
+#include <limits.h>
#define TEST_ROOT "/tmp/test-task-controller"
#define DONT_TOUCH_FILE "dont-touch-me"
@@ -185,6 +187,78 @@ void test_get_task_log_dir() {
free(logdir);
}
+void create_userlogs_dir() {
+ char** tt_roots = get_values("mapred.local.dir");
+ char** tt_root;
+ for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) {
+ char buffer[100000];
+ sprintf(buffer, "%s/userlogs", *tt_root);
+ if (mkdir(buffer, 0755) != 0) {
+ printf("FAIL: Can't create directory %s - %s\n", buffer,
+ strerror(errno));
+ exit(1);
+ }
+ }
+ free_values(tt_roots);
+}
+
+void test_create_log_directory() {
+ printf("\nTesting test_create_log_directory\n");
+ create_userlogs_dir();
+ char *job_log_dir = get_job_log_directory("job_7");
+ if (job_log_dir == NULL) {
+ exit(1);
+ }
+ if (create_directory_for_user(job_log_dir) != 0) {
+ exit(1);
+ }
+ free(job_log_dir);
+ char* good_local_dirs = get_value("mapred.local.dir");
+ if (good_local_dirs == NULL) {
+ fprintf(LOGFILE, "Mapred local directories could not be obtained.\n");
+ exit(1);
+ }
+ create_attempt_directories(username, good_local_dirs, "job_7", "task_1");
+
+ //check if symlink got created
+ struct stat file;
+ int status;
+ char actualpath [PATH_MAX+1];
+ char *res;
+ char *filepath = TEST_ROOT "/logs/userlogs/job_7/task_1";
+
+ status = lstat(filepath, &file);
+ if (!S_ISLNK(file.st_mode)) {
+ fprintf(LOGFILE, "Symlink creation failed\n");
+ exit(1);
+ }
+
+ //Check if symlink path exists
+ res = realpath(filepath, actualpath);
+ if(!res) {
+ fprintf(LOGFILE, "Failed to get target for the symlink\n");
+ exit(1);
+ }
+
+ char local_job_dir[PATH_MAX+1];
+ int i;
+ bool found = false;
+ for(i=1; i<5; i++) {
+ sprintf(local_job_dir, TEST_ROOT "/local-%d/userlogs/job_7/task_1", i);
+ if (strcmp(local_job_dir, actualpath) == 0) {
+ found = true;
+ break;
+ }
+ }
+
+ if(!found) {
+ printf("FAIL: symlink path and target path mismatch\n");
+ exit(1);
+ }
+
+ free(good_local_dirs);
+}
+
void test_check_user() {
printf("\nTesting test_check_user\n");
struct passwd *user = check_user(username);
@@ -220,8 +294,9 @@ void test_check_configuration_permission
}
void test_delete_task() {
- if (initialize_user(username)) {
- printf("FAIL: failed to initialized user %s\n", username);
+ char* local_dirs = get_value("mapred.local.dir");
+ if (initialize_user(username, local_dirs)) {
+ printf("FAIL: failed to initialize user %s\n", username);
exit(1);
}
char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
@@ -254,7 +329,7 @@ void test_delete_task() {
run(buffer);
// delete task directory
- int ret = delete_as_user(username, "jobcache/job_1/task_1");
+ int ret = delete_as_user(username, local_dirs, "jobcache/job_1/task_1");
if (ret != 0) {
printf("FAIL: return code from delete_as_user is %d\n", ret);
exit(1);
@@ -282,9 +357,11 @@ void test_delete_task() {
free(job_dir);
free(task_dir);
free(dont_touch);
+ free(local_dirs);
}
void test_delete_job() {
+ char* local_dirs = get_value("mapred.local.dir");
char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2");
char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username,
DONT_TOUCH_FILE);
@@ -315,7 +392,7 @@ void test_delete_job() {
run(buffer);
// delete task directory
- int ret = delete_as_user(username, "jobcache/job_2");
+ int ret = delete_as_user(username, local_dirs, "jobcache/job_2");
if (ret != 0) {
printf("FAIL: return code from delete_as_user is %d\n", ret);
exit(1);
@@ -339,11 +416,13 @@ void test_delete_job() {
free(job_dir);
free(task_dir);
free(dont_touch);
+ free(local_dirs);
}
void test_delete_user() {
printf("\nTesting delete_user\n");
+ char* local_dirs = get_value("mapred.local.dir");
char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3");
if (mkdirs(job_dir, 0700) != 0) {
exit(1);
@@ -354,7 +433,7 @@ void test_delete_user() {
printf("FAIL: directory missing before test\n");
exit(1);
}
- if (delete_as_user(username, "") != 0) {
+ if (delete_as_user(username, local_dirs, "") != 0) {
exit(1);
}
if (access(buffer, R_OK) == 0) {
@@ -366,10 +445,12 @@ void test_delete_user() {
exit(1);
}
free(job_dir);
+ free(local_dirs);
}
void test_delete_log_directory() {
printf("\nTesting delete_log_directory\n");
+ char* local_dirs = get_value("mapred.local.dir");
char *job_log_dir = get_job_log_directory("job_1");
if (job_log_dir == NULL) {
exit(1);
@@ -389,7 +470,7 @@ void test_delete_log_directory() {
printf("FAIL: can't access task directory - %s\n", strerror(errno));
exit(1);
}
- if (delete_log_directory("job_1/task_2") != 0) {
+ if (delete_log_directory("job_1/task_2", local_dirs) != 0) {
printf("FAIL: can't delete task directory\n");
exit(1);
}
@@ -401,7 +482,7 @@ void test_delete_log_directory() {
printf("FAIL: job directory not deleted - %s\n", strerror(errno));
exit(1);
}
- if (delete_log_directory("job_1") != 0) {
+ if (delete_log_directory("job_1", local_dirs) != 0) {
printf("FAIL: can't delete task directory\n");
exit(1);
}
@@ -409,7 +490,25 @@ void test_delete_log_directory() {
printf("FAIL: job directory not deleted\n");
exit(1);
}
+ if (delete_log_directory("job_7", local_dirs) != 0) {
+ printf("FAIL: can't delete job directory\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_7", R_OK) == 0) {
+ printf("FAIL: job log directory not deleted\n");
+ exit(1);
+ }
+ char local_job_dir[PATH_MAX+1];
+ int i;
+ for(i=1; i<5; i++) {
+ sprintf(local_job_dir, TEST_ROOT "/local-%d/userlogs/job_7", i);
+ if (access(local_job_dir, R_OK) == 0) {
+ printf("FAIL: job log directory in mapred local not deleted\n");
+ exit(1);
+ }
+ }
free(task_log_dir);
+ free(local_dirs);
}
void run_test_in_child(const char* test_name, void (*func)()) {
@@ -558,7 +657,8 @@ void test_init_job() {
exit(1);
} else if (child == 0) {
char *final_pgm[] = {"touch", "my-touch-file", 0};
- if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt",
+ char* local_dirs = get_value("mapred.local.dir");
+ if (initialize_job(username, local_dirs, "job_4", TEST_ROOT "/creds.txt",
TEST_ROOT "/job.xml", final_pgm) != 0) {
printf("FAIL: failed in child\n");
exit(42);
@@ -631,13 +731,14 @@ void test_run_task() {
fflush(stderr);
char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1",
username, "job_4", "task_1");
+ char* local_dirs = get_value("mapred.local.dir");
pid_t child = fork();
if (child == -1) {
printf("FAIL: failed to fork process for init_job - %s\n",
strerror(errno));
exit(1);
} else if (child == 0) {
- if (run_task_as_user(username, "", "job_4", "task_1",
+ if (run_task_as_user(username, local_dirs, "job_4", "task_1",
task_dir, script_name) != 0) {
printf("FAIL: failed in child\n");
exit(42);
@@ -736,6 +837,8 @@ int main(int argc, char **argv) {
test_check_user();
+ test_create_log_directory();
+
test_delete_log_directory();
// the tests that change user need to be run in a subshell, so that
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Apr 22 23:13:23 2011
@@ -65,6 +65,17 @@ public class DefaultTaskController exten
}
}
+ @Override
+ public void createLogDir(TaskAttemptID taskID,
+ boolean isCleanup) throws IOException {
+ boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup,
+ localStorage.getGoodLocalDirs());
+ if (!b) {
+ LOG.warn("Creation of attempt log dir for " + taskID
+ + " failed. Ignoring");
+ }
+ }
+
/**
* Create all of the directories for the task and launches the child jvm.
* @param user the user name
@@ -80,9 +91,8 @@ public class DefaultTaskController exten
File currentWorkDirectory,
String stdout,
String stderr) throws IOException {
-
ShellCommandExecutor shExec = null;
- try {
+ try {
FileSystem localFs = FileSystem.getLocal(getConf());
//create the attempt dirs
@@ -232,7 +242,24 @@ public class DefaultTaskController exten
public void deleteLogAsUser(String user,
String subDir) throws IOException {
Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
- fs.delete(dir, true);
+ //Delete the subDir in <hadoop.log.dir>/userlogs
+ File subDirPath = new File(dir.toString());
+ FileUtil.fullyDelete( subDirPath );
+
+ //Delete the subDir in all good <mapred.local.dirs>/userlogs
+ String [] localDirs = localStorage.getGoodLocalDirs();
+ for(String localdir : localDirs) {
+ String dirPath = localdir + File.separatorChar +
+ TaskLog.USERLOGS_DIR_NAME + File.separatorChar +
+ subDir;
+ try {
+ FileUtil.fullyDelete( new File(dirPath) );
+ } catch(Exception e){
+ //Skip bad dir for later deletion
+ LOG.warn("Could not delete dir: " + dirPath +
+ " , Reason : " + e.getMessage());
+ }
+ }
}
@Override
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Apr 22 23:13:23 2011
@@ -71,6 +71,10 @@ public abstract class TaskController imp
return conf;
}
+ public LocalStorage getLocalStorage() {
+ return localStorage;
+ }
+
public void setConf(Configuration conf) {
this.conf = conf;
}
@@ -143,6 +147,17 @@ public abstract class TaskController imp
*/
public abstract void deleteAsUser(String user,
String subDir) throws IOException;
+
+ /**
+ * Creates task log dir
+ * @param taskID ID of the task
+ * @param isCleanup If the task is cleanup task or not
+ * @throws IOException
+ */
+ public void createLogDir(TaskAttemptID taskID,
+ boolean isCleanup) throws IOException {
+
+ }
/**
* Delete the user's files under the userlogs directory.
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Apr 22 23:13:23 2011
@@ -33,6 +33,7 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,8 +44,10 @@ import org.apache.hadoop.fs.LocalFileSys
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -75,7 +78,103 @@ public class TaskLog {
LOG_DIR.mkdirs();
}
}
+
+ static AtomicInteger rotor = new AtomicInteger(0);
+
+ /**
+ * Create log directory for the given attempt. This involves creating the
+ * following and setting proper permissions for the new directories
+ * <br>{hadoop.log.dir}/userlogs/<jobid>
+ * <br>{hadoop.log.dir}/userlogs/<jobid>/<attempt-id-as-symlink>
+ * <br>{one of the mapred-local-dirs}/userlogs/<jobid>
+ * <br>{one of the mapred-local-dirs}/userlogs/<jobid>/<attempt-id>
+ *
+ * @param taskID attempt-id for which log dir is to be created
+ * @param isCleanup Is this attempt a cleanup attempt ?
+ * @param localDirs mapred local directories
+ * @return true if attempt log directory creation is succeeded
+ * @throws IOException
+ */
+ public static boolean createTaskAttemptLogDir(TaskAttemptID taskID,
+ boolean isCleanup, String[] localDirs) throws IOException{
+ String cleanupSuffix = isCleanup ? ".cleanup" : "";
+ String strAttemptLogDir = getTaskAttemptLogDir(taskID,
+ cleanupSuffix, localDirs);
+ File attemptLogDir = new File(strAttemptLogDir);
+ boolean isSucceeded = attemptLogDir.mkdirs();
+ if(isSucceeded) {
+ String strLinkAttemptLogDir = getJobDir(
+ taskID.getJobID()).getAbsolutePath() + File.separatorChar +
+ taskID.toString() + cleanupSuffix;
+ if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
+ LOG.warn("Creation of symlink to attempt log dir failed.");
+ isSucceeded = false;
+ }
+
+ File linkAttemptLogDir = new File(strLinkAttemptLogDir);
+ // Set permissions for job dir in userlogs
+ if (!Localizer.PermissionsHandler.setPermissions(
+ linkAttemptLogDir.getParentFile(),
+ Localizer.PermissionsHandler.sevenZeroZero)) {
+ LOG.warn("Setting permissions to "
+ + linkAttemptLogDir.getParentFile() + " failed.");
+ isSucceeded = false;
+ }
+ //Set permissions for target attempt log dir
+ if (!Localizer.PermissionsHandler.setPermissions(attemptLogDir,
+ Localizer.PermissionsHandler.sevenZeroZero)) {
+ LOG.warn("Setting permissions to the real attempt log dir "
+ + attemptLogDir + " failed.");
+ isSucceeded = false;
+ }
+ //Set permissions for target job log dir
+ if (!Localizer.PermissionsHandler.setPermissions(
+ attemptLogDir.getParentFile(),
+ Localizer.PermissionsHandler.sevenZeroZero)) {
+ LOG.warn("Setting permissions to the real job log dir "
+ + attemptLogDir.getParentFile() + " failed.");
+ isSucceeded = false;
+ }
+ }
+ return isSucceeded;
+ }
+
+ /**
+ * Get one of the mapred local directory in a round-robin-way.
+ * @param localDirs mapred local directories
+ * @return the next chosen mapred local directory
+ * @throws IOException
+ */
+ private static String getNextLocalDir(String[] localDirs) throws IOException{
+ if(localDirs.length == 0) {
+ throw new IOException ("Not enough mapred.local.dirs ("
+ + localDirs.length + ")");
+ }
+ return localDirs[Math.abs(rotor.getAndIncrement()) % localDirs.length];
+ }
+ /**
+ * Get attempt log directory path for the given attempt-id under randomly
+ * selected mapred local directory.
+ * @param taskID attempt-id for which log dir path is needed
+ * @param cleanupSuffix ".cleanup" if this attempt is a cleanup attempt
+ * @param localDirs mapred local directories
+ * @return target task attempt log directory
+ * @throws IOException
+ */
+ public static String getTaskAttemptLogDir(TaskAttemptID taskID,
+ String cleanupSuffix, String[] localDirs) throws IOException {
+ StringBuilder taskLogDirLocation = new StringBuilder();
+ taskLogDirLocation.append(getNextLocalDir(localDirs));
+ taskLogDirLocation.append(File.separatorChar);
+ taskLogDirLocation.append(USERLOGS_DIR_NAME);
+ taskLogDirLocation.append(File.separatorChar);
+ taskLogDirLocation.append(taskID.getJobID().toString());
+ taskLogDirLocation.append(File.separatorChar);
+ taskLogDirLocation.append(taskID.toString()+cleanupSuffix);
+ return taskLogDirLocation.toString();
+ }
+
public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
LogName filter) {
return new File(getAttemptDir(taskid, isCleanup), filter.toString());
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Apr 22 23:13:23 2011
@@ -313,15 +313,7 @@ abstract class TaskRunner extends Thread
TaskLog.LogName.STDOUT);
logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup,
TaskLog.LogName.STDERR);
- File logDir = logFiles[0].getParentFile();
- boolean b = logDir.mkdirs();
- if (!b) {
- LOG.warn("mkdirs failed. Ignoring");
- } else {
- Localizer.PermissionsHandler.setPermissions(logDir,
- Localizer.PermissionsHandler.sevenZeroZero);
- }
-
+ getTracker().getTaskController().createLogDir(taskid, isCleanup);
return logFiles;
}
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Apr 22 23:13:23 2011
@@ -745,7 +745,13 @@ public class TaskTracker implements MRCo
for (String s : localStorage.getGoodLocalDirs()) {
localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
}
-
+ // Create userlogs directory under all good mapred-local-dirs
+ for (String s : localStorage.getGoodLocalDirs()) {
+ Path userLogsDir = new Path(s, TaskLog.USERLOGS_DIR_NAME);
+ if (!localFs.exists(userLogsDir)) {
+ localFs.mkdirs(userLogsDir, pub);
+ }
+ }
// Clear out state tables
this.tasks.clear();
this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
@@ -905,7 +911,9 @@ public class TaskTracker implements MRCo
* startup, to remove any leftovers from previous run.
*/
public void cleanupStorage() throws IOException {
- this.fConf.deleteLocalFiles();
+ this.fConf.deleteLocalFiles(SUBDIR);
+ this.fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+ this.fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
}
// Object on wait which MapEventsFetcherThread is going to wait.
@@ -1396,6 +1404,14 @@ public class TaskTracker implements MRCo
fConf = conf;
}
+ void setLocalStorage(LocalStorage in) {
+ localStorage = in;
+ }
+
+ void setLocalDirAllocator(LocalDirAllocator in) {
+ localDirAllocator = in;
+ }
+
/**
* Start with the local machine name, and the default JobTracker
*/
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java Fri Apr 22 23:13:23 2011
@@ -123,36 +123,52 @@ public class UserLogCleaner extends Thre
}
/**
- * Clears all the logs in userlog directory.
+ * Adds the job log directories for deletion with default retain hours.
+ * Deletes all other directories, if any.
*
- * Adds the job directories for deletion with default retain hours. Deletes
- * all other directories, if any. This is usually called on reinit/restart of
- * the TaskTracker
+ * @param loc location of log directory
+ * @param conf
+ * @throws IOException
+ */
+ public void addOldUserLogsForDeletion(File loc, Configuration conf)
+ throws IOException {
+ if (loc.exists()) {
+ long now = clock.getTime();
+ for(String logDir: loc.list()) {
+ // add all the log dirs to taskLogsMnonitor.
+ JobID jobid = null;
+ try {
+ jobid = JobID.forName(logDir);
+ } catch (IllegalArgumentException ie) {
+ deleteLogPath(logDir);
+ continue;
+ }
+ // add the job log directory for deletion with
+ // default retain hours, if it is not already added
+ if (!completedJobs.containsKey(jobid)) {
+ JobCompletedEvent jce =
+ new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
+ userLogManager.addLogEvent(jce);
+ }
+ }
+ }
+ }
+
+ /**
+ * Clears all the logs in userlogs directory. This is usually called on
+ * reinit/restart of the TaskTracker.
*
* @param conf
* @throws IOException
*/
public void clearOldUserLogs(Configuration conf) throws IOException {
File userLogDir = TaskLog.getUserLogDir();
- if (userLogDir.exists()) {
- long now = clock.getTime();
- for(String logDir: userLogDir.list()) {
- // add all the log dirs to taskLogsMnonitor.
- JobID jobid = null;
- try {
- jobid = JobID.forName(logDir);
- } catch (IllegalArgumentException ie) {
- deleteLogPath(logDir);
- continue;
- }
- // add the job log directory for deletion with default retain hours,
- // if it is not already added
- if (!completedJobs.containsKey(jobid)) {
- JobCompletedEvent jce =
- new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
- userLogManager.addLogEvent(jce);
- }
- }
+ addOldUserLogsForDeletion(userLogDir, conf);
+ String[] localDirs = conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+ for(String localDir : localDirs) {
+ File mapredLocalUserLogDir = new File(localDir +
+ File.separatorChar + TaskLog.USERLOGS_DIR_NAME);
+ addOldUserLogsForDeletion(mapredLocalUserLogDir, conf);
}
}
@@ -196,6 +212,48 @@ public class UserLogCleaner extends Thre
}
/**
+ * Gets the user for the log path.
+ *
+ * @param logPath
+ * @throws IOException
+ */
+ private String getLogUser(String logPath) throws IOException{
+ //Get user from <hadoop.log.dir>/userlogs/jobid path
+ String logRoot = TaskLog.getUserLogDir().toString();
+ String user = null;
+ try{
+ user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+ }catch(Exception e){
+ //Ignore this exception since this path might have been deleted.
+ }
+
+ //If we found the user for this logPath, then return this user
+ if(user != null) return user;
+
+ //If <hadoop.log.dir>/userlogs/jobid not found, then get user from
+ //any one of existing <mapred.local.dir>/userlogs/jobid path(s)
+ String[] localDirs =
+ userLogManager.getTaskController().getLocalStorage().getGoodLocalDirs();
+ for(String localDir : localDirs) {
+ try{
+ logRoot = localDir + File.separator + TaskLog.USERLOGS_DIR_NAME;
+ user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+ //If we found the user for this logPath, then break this loop
+ if(user != null) break;
+
+ }catch(Exception e){
+ //Ignore this exception since this path might have been deleted.
+ }
+ }
+
+ if(user == null) {
+ throw new IOException("Userlog path not found for " + logPath);
+ }
+
+ return user;
+ }
+
+ /**
* Deletes the log path.
*
* This path will be removed through {@link CleanupQueue}
@@ -205,8 +263,7 @@ public class UserLogCleaner extends Thre
*/
private void deleteLogPath(String logPath) throws IOException {
LOG.info("Deleting user log path " + logPath);
- String logRoot = TaskLog.getUserLogDir().toString();
- String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+ String user = getLogUser(logPath);
TaskController controller = userLogManager.getTaskController();
PathDeletionContext item =
new TaskController.DeletionContext(controller, true, user, logPath);
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Fri Apr 22 23:13:23 2011
@@ -23,11 +23,14 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
import static org.junit.Assert.*;
@@ -48,15 +51,18 @@ public class TestUserLogCleanup {
private JobID jobid4 = new JobID(jtid, 4);
private File foo = new File(TaskLog.getUserLogDir(), "foo");
private File bar = new File(TaskLog.getUserLogDir(), "bar");
+ private static String TEST_ROOT_DIR =
+ System.getProperty("test.build.data", "/tmp");
- public TestUserLogCleanup() throws IOException {
- Configuration conf = new Configuration();
+ public TestUserLogCleanup() throws IOException, InterruptedException {
+ JobConf conf= new JobConf();
startTT(conf);
}
@After
public void tearDown() throws IOException {
FileUtil.fullyDelete(TaskLog.getUserLogDir());
+ FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
}
private File localizeJob(JobID jobid) throws IOException {
@@ -77,14 +83,26 @@ public class TestUserLogCleanup {
userLogManager.addLogEvent(jce);
}
- private void startTT(Configuration conf) throws IOException {
+ private void startTT(JobConf conf) throws IOException, InterruptedException {
myClock = new FakeClock(); // clock is reset.
+ String localdirs = TEST_ROOT_DIR + "/userlogs/local/0," +
+ TEST_ROOT_DIR + "/userlogs/local/1";
+ conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localdirs);
tt = new TaskTracker();
tt.setConf(new JobConf(conf));
+ LocalDirAllocator localDirAllocator =
+ new LocalDirAllocator("mapred.local.dir");
+ tt.setLocalDirAllocator(localDirAllocator);
+ LocalStorage localStorage = new LocalStorage(conf.getLocalDirs());
+ localStorage.checkLocalDirs();
+ tt.setLocalStorage(localStorage);
localizer = new Localizer(FileSystem.get(conf), conf
.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
tt.setLocalizer(localizer);
userLogManager = new UtilsForTests.InLineUserLogManager(conf);
+ TaskController taskController = userLogManager.getTaskController();
+ taskController.setup(localDirAllocator, localStorage);
+ tt.setTaskController(taskController);
userLogCleaner = userLogManager.getUserLogCleaner();
userLogCleaner.setClock(myClock);
tt.setUserLogManager(userLogManager);
@@ -92,13 +110,13 @@ public class TestUserLogCleanup {
}
private void ttReinited() throws IOException {
- Configuration conf = new Configuration();
+ JobConf conf=new JobConf();
conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
userLogManager.clearOldUserLogs(conf);
}
- private void ttRestarted() throws IOException {
- Configuration conf = new Configuration();
+ private void ttRestarted() throws IOException, InterruptedException {
+ JobConf conf=new JobConf();
conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
startTT(conf);
}
@@ -228,9 +246,11 @@ public class TestUserLogCleanup {
* restart.
*
* @throws IOException
+ * @throws InterruptedException
*/
@Test
- public void testUserLogCleanupAfterRestart() throws IOException {
+ public void testUserLogCleanupAfterRestart()
+ throws IOException, InterruptedException {
File jobUserlog1 = localizeJob(jobid1);
File jobUserlog2 = localizeJob(jobid2);
File jobUserlog3 = localizeJob(jobid3);