You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:27:31 UTC
svn commit: r1076976 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
c++/task-controller/ core/org/apache/hadoop/util/
docs/src/documentation/content/xdocs/ mapred/org/apache/hadoop/mapred/
test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:27:30 2011
New Revision: 1076976
URL: http://svn.apache.org/viewvc?rev=1076976&view=rev
Log:
commit 89dc6c1752d5d147cfd8333e490cab73ab9d551c
Author: Yahoo\! <lt...@yahoo-inc.com>
Date: Tue Aug 18 09:14:36 2009 -0700
Apply patch for HADOOP:5420 from: http://issues.apache.org/jira/secure/attachment/12414735/5420-ydist.patch.txt
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/configuration.c?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.c Fri Mar 4 03:27:30 2011
@@ -18,8 +18,11 @@
#include "configuration.h"
+
char * hadoop_conf_dir;
+struct configuration config={.size=0, .confdetails=NULL};
+
//clean up method for freeing configuration
void free_configurations() {
int i = 0;
@@ -199,3 +202,36 @@ const char * get_value(char* key) {
return NULL;
}
+const char ** get_values(char * key) {
+ const char ** toPass = NULL;
+ const char * value = get_value(key);
+ char *tempTok = NULL;
+ char *tempstr = NULL;
+ int size = 0;
+ int len;
+ //first allocate any array of 10
+ if(value != NULL) {
+ toPass = (const char **) malloc(sizeof(char *) * MAX_SIZE);
+ tempTok = strtok_r((char *)value, ",", &tempstr);
+ if (tempTok != NULL) {
+ while (1) {
+ toPass[size++] = tempTok;
+ tempTok = strtok_r(NULL, ",", &tempstr);
+ if(tempTok == NULL){
+ break;
+ }
+ if((size % MAX_SIZE) == 0) {
+ toPass = (const char **) realloc(toPass,(sizeof(char *) *
+ (MAX_SIZE * ((size/MAX_SIZE) +1))));
+ }
+ }
+ } else {
+ toPass[size] = (char *)value;
+ }
+ }
+ if(size > 0) {
+ toPass[size] = NULL;
+ }
+ return toPass;
+}
+
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/configuration.h.in?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/configuration.h.in Fri Mar 4 03:27:30 2011
@@ -47,7 +47,7 @@ FILE *LOGFILE;
#define CONF_FILE_PATTERN "%s/conf/taskcontroller.cfg"
#endif
-struct configuration config;
+extern struct configuration config;
//configuration file contents
#ifndef HADOOP_CONF_DIR
extern char *hadoop_conf_dir;
@@ -57,3 +57,6 @@ const char * get_value(char* key);
//method to free allocated configuration
void free_configurations();
+//function to return array of values pointing to the key. Values are
+//comma seperated strings.
+const char ** get_values(char* key);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar 4 03:27:30 2011
@@ -25,15 +25,16 @@ int main(int argc, char **argv) {
const char * task_id = NULL;
const char * tt_root = NULL;
int exit_code = 0;
+ const char * task_pid = NULL;
const char* const short_options = "l:";
const struct option long_options[] = { { "log", 1, NULL, 'l' }, { NULL, 0,
NULL, 0 } };
const char* log_file = NULL;
- // when we support additional commands without ttroot, this check
- // may become command specific.
- if (argc < 6) {
+ //Minimum number of arguments required to run the task-controller
+ //command-name user command tt-root
+ if (argc < 3) {
display_usage(stderr);
return INVALID_ARGUMENT_NUMBER;
}
@@ -44,7 +45,6 @@ int main(int argc, char **argv) {
strncpy(hadoop_conf_dir,argv[0],(strlen(argv[0]) - strlen(EXEC_PATTERN)));
hadoop_conf_dir[(strlen(argv[0]) - strlen(EXEC_PATTERN))] = '\0';
#endif
-
do {
next_option = getopt_long(argc, argv, short_options, long_options, NULL);
switch (next_option) {
@@ -88,24 +88,25 @@ int main(int argc, char **argv) {
}
optind = optind + 1;
command = atoi(argv[optind++]);
- job_id = argv[optind++];
- task_id = argv[optind++];
-
#ifdef DEBUG
fprintf(LOGFILE, "main : command provided %d\n",command);
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
- fprintf(LOGFILE, "main : job id %s \n", job_id);
- fprintf(LOGFILE, "main : task id %s \n", task_id);
#endif
switch (command) {
- case RUN_TASK:
- tt_root = argv[optind];
+ case LAUNCH_TASK_JVM:
+ tt_root = argv[optind++];
+ job_id = argv[optind++];
+ task_id = argv[optind++];
exit_code
= run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root);
break;
- case KILL_TASK:
- tt_root = argv[optind];
- exit_code = kill_user_task(user_detail->pw_name, job_id, task_id, tt_root);
+ case TERMINATE_TASK_JVM:
+ task_pid = argv[optind++];
+ exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM);
+ break;
+ case KILL_TASK_JVM:
+ task_pid = argv[optind++];
+ exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar 4 03:27:30 2011
@@ -23,9 +23,6 @@ struct passwd *user_detail = NULL;
//LOGFILE
FILE *LOGFILE;
-//hadoop temp dir root which is configured in secure configuration
-const char *mapred_local_dir;
-
//placeholder for global cleanup operations
void cleanup() {
free_configurations();
@@ -38,94 +35,67 @@ int change_user(const char * user) {
}
if(initgroups(user_detail->pw_name, user_detail->pw_gid) != 0) {
- cleanup();
- return SETUID_OPER_FAILED;
+ cleanup();
+ return SETUID_OPER_FAILED;
}
-#ifdef DEBUG
- fprintf(LOGFILE,"change_user : setting user as %s ", user_detail->pw_name);
-#endif
+
errno = 0;
+
setgid(user_detail->pw_gid);
if (errno != 0) {
+ fprintf(LOGFILE, "unable to setgid : %s\n", strerror(errno));
cleanup();
return SETUID_OPER_FAILED;
}
setegid(user_detail->pw_gid);
if (errno != 0) {
+ fprintf(LOGFILE, "unable to setegid : %s\n", strerror(errno));
cleanup();
return SETUID_OPER_FAILED;
}
setuid(user_detail->pw_uid);
if (errno != 0) {
+ fprintf(LOGFILE, "unable to setuid : %s\n", strerror(errno));
cleanup();
return SETUID_OPER_FAILED;
}
seteuid(user_detail->pw_uid);
if (errno != 0) {
+ fprintf(LOGFILE, "unable to seteuid : %s\n", strerror(errno));
cleanup();
return SETUID_OPER_FAILED;
}
return 0;
}
-//Function to set the hadoop.temp.dir key from configuration.
-//would return -1 if the configuration is not proper.
-
-int get_mapred_local_dir() {
-
- if (mapred_local_dir == NULL) {
- mapred_local_dir = get_value(TT_SYS_DIR_KEY);
- }
-
- //after the call it should not be null
- if (mapred_local_dir == NULL) {
- return -1;
- } else {
- return 0;
- }
-
-}
// function to check if the passed tt_root is present in hadoop.tmp.dir
int check_tt_root(const char *tt_root) {
- char *token;
+ char ** mapred_local_dir;
int found = -1;
if (tt_root == NULL) {
return -1;
}
- if (mapred_local_dir == NULL) {
- if (get_mapred_local_dir() < 0) {
- fprintf(LOGFILE, "invalid hadoop config\n");
- return -1;
- }
- }
+ mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY);
- token = strtok((char *) mapred_local_dir, ",");
- if (token == NULL && mapred_local_dir != NULL) {
-#ifdef DEBUG
- fprintf(LOGFILE,"Single hadoop.tmp.dir configured");
-#endif
- token = (char *)mapred_local_dir;
+ if (mapred_local_dir == NULL) {
+ return -1;
}
- while (1) {
- if (strcmp(tt_root, token) == 0) {
+ while(*mapred_local_dir != NULL) {
+ if(strcmp(*mapred_local_dir,tt_root) == 0) {
found = 0;
break;
}
- token = strtok(NULL, ",");
- if (token == NULL) {
- break;
- }
}
-
+ free(mapred_local_dir);
return found;
-
}
+
/**
* Function to check if the constructed path and absolute
* path resolve to one and same.
@@ -157,39 +127,6 @@ int check_owner(uid_t uid, char *path) {
}
return 0;
}
-/*
- *Function which would return .pid file path which is used while running
- * and killing of the tasks by the user.
- *
- * check TT_SYS_DIR for pattern
- */
-void get_pid_path(const char * jobid, const char * taskid, const char *tt_root,
- char ** pid_path) {
-
- int str_len = strlen(TT_SYS_DIR) + strlen(jobid) + strlen(taskid) + strlen(
- tt_root);
- *pid_path = NULL;
-
- if (mapred_local_dir == NULL) {
- if (get_mapred_local_dir() < 0) {
- return;
- }
- }
-
- *pid_path = (char *) malloc(sizeof(char) * (str_len + 1));
-
- if (*pid_path == NULL) {
- fprintf(LOGFILE, "unable to allocate memory for pid path\n");
- return;
- }
- memset(*pid_path,'\0',str_len+1);
- snprintf(*pid_path, str_len, TT_SYS_DIR, tt_root, jobid, taskid);
-#ifdef DEBUG
- fprintf(LOGFILE, "get_pid_path : pid path = %s\n", *pid_path);
- fflush(LOGFILE);
-#endif
-
-}
/*
* function to provide path to the task file which is created by the tt
@@ -198,19 +135,19 @@ void get_pid_path(const char * jobid, co
*/
void get_task_file_path(const char * jobid, const char * taskid,
const char * tt_root, char **task_script_path) {
+ const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY);
*task_script_path = NULL;
int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen(
taskid)) + strlen(tt_root);
if (mapred_local_dir == NULL) {
- if (get_mapred_local_dir() < 0) {
- return;
- }
+ return;
}
*task_script_path = (char *) malloc(sizeof(char) * (str_len + 1));
if (*task_script_path == NULL) {
fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n");
+ free(mapred_local_dir);
return;
}
@@ -221,7 +158,7 @@ void get_task_file_path(const char * job
fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path);
fflush(LOGFILE);
#endif
-
+ free(mapred_local_dir);
}
//end of private functions
@@ -247,142 +184,55 @@ int get_user_details(const char *user) {
*Function used to launch a task as the provided user.
* First the function checks if the tt_root passed is found in
* hadoop.temp.dir
- *
- *Then gets the path to which the task has to write its pid from
- *get_pid_path.
- *
- * THen writes its pid into the file.
- *
- * Then changes the permission of the pid file into 600
- *
- * Then uses get_task_file_path to fetch the task script file path.
- *
+ * Uses get_task_file_path to fetch the task script file path.
* Does an execlp on the same in order to replace the current image with
* task image.
- *
*/
int run_task_as_user(const char * user, const char *jobid, const char *taskid,
const char *tt_root) {
char *task_script_path = NULL;
- char *pid_path = NULL;
- char *task_script = NULL;
- FILE *file_handle = NULL;
int exit_code = 0;
uid_t uid = getuid();
+ if(jobid == NULL || taskid == NULL) {
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
#ifdef DEBUG
fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid);
fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid);
fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root);
fflush(LOGFILE);
#endif
-
+ //Check tt_root before switching the user, as reading configuration
+ //file requires privileged access.
if (check_tt_root(tt_root) < 0) {
fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
cleanup();
return INVALID_TT_ROOT;
}
- get_pid_path(jobid, taskid, tt_root, &pid_path);
- if (pid_path == NULL) {
- fprintf(LOGFILE, "Invalid task-pid path provided");
- cleanup();
- return INVALID_PID_PATH;
- }
- errno = 0;
- file_handle = fopen(pid_path, "w");
-
- if (file_handle == NULL) {
- fprintf(LOGFILE, "Error opening task-pid file %s :%s\n", pid_path,
- strerror(errno));
- exit_code = UNABLE_TO_OPEN_PID_FILE_WRITE_MODE;
- goto cleanup;
- }
-
- errno = 0;
- if (fprintf(file_handle, "%d\n", getpid()) < 0) {
- fprintf(LOGFILE, "Error writing to task-pid file :%s\n", strerror(errno));
- exit_code = UNABLE_TO_WRITE_TO_PID_FILE;
- goto cleanup;
- }
-
- fflush(file_handle);
- fclose(file_handle);
- file_handle = NULL;
- //change the permissions of the file
- errno = 0;
- //setting permission to 600
- if (chmod(pid_path, S_IREAD | S_IWRITE) < 0) {
- fprintf(LOGFILE, "Error changing permission of %s task-pid file : %s",
- pid_path, strerror(errno));
- errno = 0;
- if (remove(pid_path) < 0) {
- fprintf(LOGFILE, "Error deleting %s task-pid file : %s", pid_path,
- strerror(errno));
- exit_code = UNABLE_TO_CHANGE_PERMISSION_AND_DELETE_PID_FILE;
- } else {
- exit_code = UNABLE_TO_CHANGE_PERMISSION_OF_PID_FILE;
- }
- goto cleanup;
- }
-
- if(chown(pid_path, uid, getgid()) < 0) {
- fprintf(LOGFILE, "Error changing ownershipt of %s task-pid file : %s\n",
- pid_path, strerror(errno));
- errno = 0;
- if (remove(pid_path) < 0) {
- fprintf(LOGFILE, "Error deleting %s task-pid file : %s", pid_path,
- strerror(errno));
- exit_code = UNABLE_TO_CHANGE_OWNERSHIP_AND_DELETE_PID_FILE;
- } else {
- exit_code = UNABLE_TO_CHANGE_OWNERSHIP_OF_PID_FILE;
- }
- goto cleanup;
- }
- //while checking path make sure the target of the path exists otherwise
- //check_paths would fail always. So write out .pid file then check if
- //it correctly resolves. If not delete the pid file and bail out.
- errno = 0;
- exit_code = check_path(pid_path);
- if(exit_code != 0) {
- remove(pid_path);
- goto cleanup;
- }
-
- //free pid_t path which is allocated
- free(pid_path);
- pid_path = NULL;
-
//change the user
- fcloseall();
fclose(LOGFILE);
+ fcloseall();
umask(0);
if (change_user(user) != 0) {
- exit_code = SETUID_OPER_FAILED;
- goto cleanup;
- }
-
- //change set the launching process as the session leader.
- if(setsid() < 0) {
- exit_code = SETSID_FAILED;
- goto cleanup;
+ cleanup();
+ return SETUID_OPER_FAILED;
}
get_task_file_path(jobid, taskid, tt_root, &task_script_path);
-
if (task_script_path == NULL) {
- exit_code = INVALID_TASK_SCRIPT_PATH;
- goto cleanup;
+ cleanup();
+ return INVALID_TASK_SCRIPT_PATH;
}
- //resolve paths.
errno = 0;
exit_code = check_path(task_script_path);
- if(exit_code !=0) {
+ if(exit_code != 0) {
goto cleanup;
}
errno = 0;
- //get stat of the task file.
exit_code = check_owner(uid, task_script_path);
if(exit_code != 0) {
goto cleanup;
@@ -398,127 +248,53 @@ int run_task_as_user(const char * user,
return exit_code;
cleanup:
- if (pid_path != NULL) {
- free(pid_path);
- }
if (task_script_path != NULL) {
free(task_script_path);
}
- if (file_handle != NULL) {
- fclose(file_handle);
- }
// free configurations
cleanup();
return exit_code;
}
+
/**
- * Function used to terminate a task launched by the user.
- *
- * The function first checks if the passed tt-root is found in
- * configured hadoop.temp.dir (which is a list of tt_roots).
- *
- * Then gets the task-pid path using function get_pid_path.
- *
- * reads the task-pid from the file which is mentioned by get_pid_path
- *
- * kills the task by sending SIGTERM to that particular process.
- *
+ * Function used to terminate/kill a task launched by the user.
+ * The function sends appropriate signal to the process group
+ * specified by the task_pid.
*/
-int kill_user_task(const char *user, const char *jobid, const char *taskid,
- const char *tt_root) {
+int kill_user_task(const char *user, const char *task_pid, int sig) {
int pid = 0;
- int i = 0;
- char *pid_path = NULL;
- FILE *file_handle = NULL;
- const char *sleep_interval_char;
- int sleep_interval = 0;
- uid_t uid = getuid();
- int exit_code = 0;
-#ifdef DEBUG
- fprintf(LOGFILE,"kill_user_task : Job id : %s \n", jobid);
- fprintf(LOGFILE,"kill_user_task : task id : %s \n", taskid);
- fprintf(LOGFILE,"kill_user_task : tt_root : %s \n", tt_root);
- fflush(LOGFILE);
-#endif
- if (check_tt_root(tt_root) < 0) {
- fprintf(LOGFILE, "invalid tt root specified");
- cleanup();
- return INVALID_TT_ROOT;
- }
- get_pid_path(jobid, taskid, tt_root, &pid_path);
- if (pid_path == NULL) {
- cleanup();
- return INVALID_PID_PATH;
- }
- errno = 0;
- exit_code = check_path(pid_path);
- if(exit_code != 0) {
- free(pid_path);
- cleanup();
- return exit_code;
- }
- errno = 0;
- exit_code = check_owner(uid, pid_path);
- if(exit_code != 0) {
- free(pid_path);
- cleanup();
- return exit_code;
+ if(task_pid == NULL) {
+ return INVALID_ARGUMENT_NUMBER;
}
+ pid = atoi(task_pid);
-#ifdef DEBUG
- fprintf(LOGFILE,"kill_user_task : task-pid path :%s \n",pid_path);
- fflush(LOGFILE);
-#endif
- errno = 0;
- file_handle = fopen(pid_path, "r");
- if (file_handle == NULL) {
- fprintf(LOGFILE, "unable to open task-pid file :%s \n", pid_path);
- free(pid_path);
- cleanup();
- return UNABLE_TO_OPEN_PID_FILE_READ_MODE;
+ if(pid <= 0) {
+ return INVALID_TASK_PID;
}
- fscanf(file_handle, "%d", &pid);
- fclose(file_handle);
fclose(LOGFILE);
- free(pid_path);
- if (pid == 0) {
- cleanup();
- return UNABLE_TO_READ_PID;
- }
+ fcloseall();
if (change_user(user) != 0) {
cleanup();
return SETUID_OPER_FAILED;
}
- //kill the entire session.
- if (kill(-pid, SIGTERM) < 0) {
- fprintf(LOGFILE, "%s\n", strerror(errno));
- cleanup();
- return UNABLE_TO_KILL_TASK;
+
+ //Don't continue if the process-group is not alive anymore.
+ if(kill(-pid,0) < 0) {
+ errno = 0;
+ return 0;
}
- //get configured interval time.
- sleep_interval_char = get_value("mapred.tasktracker.tasks.sleeptime-before-sigkill");
- if(sleep_interval_char != NULL) {
- sleep_interval = atoi(sleep_interval_char);
- }
- if(sleep_interval == 0) {
- sleep_interval = 5;
- }
- //sleep for configured interval.
- sleep(sleep_interval);
- //check pid exists
- if(kill(-pid,0) == 0) {
- //if pid present then sigkill it
- if(kill(-pid, SIGKILL) <0) {
- //ignore no such pid present.
- if(errno != ESRCH) {
- //log error ,exit unclean
- cleanup();
- return UNABLE_TO_KILL_TASK;
- }
+
+ if (kill(-pid, sig) < 0) {
+ if(errno != ESRCH) {
+ fprintf(LOGFILE, "Error is %s\n", strerror(errno));
+ cleanup();
+ return UNABLE_TO_KILL_TASK;
}
+ errno = 0;
}
cleanup();
return 0;
}
+
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar 4 03:27:30 2011
@@ -28,51 +28,37 @@
#include <sys/stat.h>
#include <sys/signal.h>
#include <getopt.h>
-#include <grp.h>
+#include<grp.h>
#include "configuration.h"
//command definitions
enum command {
- RUN_TASK,
- KILL_TASK
+ LAUNCH_TASK_JVM,
+ TERMINATE_TASK_JVM,
+ KILL_TASK_JVM
};
enum errorcodes {
INVALID_ARGUMENT_NUMBER = 1,
- INVALID_USER_NAME,
- INVALID_COMMAND_PROVIDED,
- SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS,
- OUT_OF_MEMORY,
- INVALID_TT_ROOT,
- INVALID_PID_PATH,
- UNABLE_TO_OPEN_PID_FILE_WRITE_MODE,
- UNABLE_TO_OPEN_PID_FILE_READ_MODE,
- UNABLE_TO_WRITE_TO_PID_FILE,
- UNABLE_TO_CHANGE_PERMISSION_OF_PID_FILE,
- UNABLE_TO_CHANGE_PERMISSION_AND_DELETE_PID_FILE,
- SETUID_OPER_FAILED,
- INVALID_TASK_SCRIPT_PATH,
- UNABLE_TO_EXECUTE_TASK_SCRIPT,
- UNABLE_TO_READ_PID,
- UNABLE_TO_KILL_TASK,
- UNABLE_TO_FIND_PARENT_PID_FILE,
- UNABLE_TO_READ_PARENT_PID,
- SETSID_FAILED,
- ERROR_RESOLVING_FILE_PATH,
- RELATIVE_PATH_COMPONENTS_IN_FILE_PATH,
- UNABLE_TO_STAT_FILE,
- FILE_NOT_OWNED_BY_TASKTRACKER,
- UNABLE_TO_CHANGE_OWNERSHIP_OF_PID_FILE,
- UNABLE_TO_CHANGE_OWNERSHIP_AND_DELETE_PID_FILE
+ INVALID_USER_NAME, //2
+ INVALID_COMMAND_PROVIDED, //3
+ SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4
+ INVALID_TT_ROOT, //5
+ SETUID_OPER_FAILED, //6
+ INVALID_TASK_SCRIPT_PATH, //7
+ UNABLE_TO_EXECUTE_TASK_SCRIPT, //8
+ UNABLE_TO_KILL_TASK, //9
+ INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10
+ INVALID_TASK_PID, //11
+ ERROR_RESOLVING_FILE_PATH, //12
+ RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
+ UNABLE_TO_STAT_FILE, //14
+ FILE_NOT_OWNED_BY_TASKTRACKER //15
};
-#define TT_PID_PATTERN "%s/hadoop-%s-tasktracker.pid"
-
#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
-#define TT_SYS_DIR "%s/taskTracker/jobcache/%s/%s/.pid"
-
#define TT_SYS_DIR_KEY "mapred.local.dir"
#define MAX_ITEMS 10
@@ -90,8 +76,6 @@ void display_usage(FILE *stream);
int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root);
-int verify_parent();
-
-int kill_user_task(const char *user, const char *jobid, const char *taskid, const char *tt_root);
+int kill_user_task(const char *user, const char *task_pid, int sig);
int get_user_details(const char *user);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java Fri Mar 4 03:27:30 2011
@@ -54,73 +54,24 @@ public class ProcessTree {
}
/**
- * Kills the process(OR process group) by sending the signal SIGKILL
- * in the current thread
- * @param pid Process id(OR process group id) of to-be-deleted-process
- * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
- * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
- * sending SIGTERM
- */
- private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
- long sleepTimeBeforeSigKill) {
- // Kill the subprocesses of root process(even if the root process is not
- // alive) if process group is to be killed.
- if (isProcessGroup || ProcessTree.isAlive(pid)) {
- try {
- // Sleep for some time before sending SIGKILL
- Thread.sleep(sleepTimeBeforeSigKill);
- } catch (InterruptedException i) {
- LOG.warn("Thread sleep is interrupted.");
- }
-
- ShellCommandExecutor shexec = null;
-
- try {
- String pid_pgrpid;
- if(isProcessGroup) {//kill the whole process group
- pid_pgrpid = "-" + pid;
- }
- else {//kill single process
- pid_pgrpid = pid;
- }
-
- String[] args = { "kill", "-9", pid_pgrpid };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- if(isProcessGroup) {
- LOG.info("Killing process group" + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
- else {
- LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
- }
- }
- }
-
- /** Kills the process(OR process group) by sending the signal SIGKILL
- * @param pid Process id(OR process group id) of to-be-deleted-process
- * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+ * Destroy the process-tree.
+ * @param pid process id of the root process of the subtree of processes
+ * to be killed
* @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
* after sending SIGTERM
+ * @param isProcessGroup pid is a process group leader or not
* @param inBackground Process is to be killed in the back ground with
* a separate thread
*/
- private static void sigKill(String pid, boolean isProcessGroup,
- long sleeptimeBeforeSigkill, boolean inBackground) {
-
- if(inBackground) { // use a separate thread for killing
- SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
- sleeptimeBeforeSigkill);
- sigKillThread.setDaemon(true);
- sigKillThread.start();
+ public static void destroy(String pid, long sleeptimeBeforeSigkill,
+ boolean isProcessGroup, boolean inBackground) {
+ if(isProcessGroup) {
+ destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
}
else {
- sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+ //TODO: Destroy all the processes in the subtree in this case also.
+ // For the time being, killing only the root process.
+ destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
}
}
@@ -133,6 +84,29 @@ public class ProcessTree {
*/
protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
boolean inBackground) {
+ terminateProcess(pid);
+ sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
+ }
+
+ /** Destroy the process group.
+ * @param pgrpId Process group id of to-be-killed-processes
+ * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+ * after sending SIGTERM
+ * @param inBackground Process group is to be killed in the back ground with
+ * a separate thread
+ */
+ protected static void destroyProcessGroup(String pgrpId,
+ long sleeptimeBeforeSigkill, boolean inBackground) {
+ terminateProcessGroup(pgrpId);
+ sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
+ }
+
+ /**
+ * Sends terminate signal to the process, allowing it to gracefully exit.
+ *
+ * @param pid pid of the process to be sent SIGTERM
+ */
+ public static void terminateProcess(String pid) {
ShellCommandExecutor shexec = null;
try {
String[] args = { "kill", pid };
@@ -144,19 +118,15 @@ public class ProcessTree {
LOG.info("Killing process " + pid +
" with SIGTERM. Exit code " + shexec.getExitCode());
}
-
- sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
}
-
- /** Destroy the process group.
- * @param pgrpId Process group id of to-be-killed-processes
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process group is to be killed in the back ground with
- * a separate thread
+
+ /**
+ * Sends terminate signal to all the process belonging to the passed process
+ * group, allowing the group to gracefully exit.
+ *
+ * @param pgrpId process group id
*/
- protected static void destroyProcessGroup(String pgrpId,
- long sleeptimeBeforeSigkill, boolean inBackground) {
+ public static void terminateProcessGroup(String pgrpId) {
ShellCommandExecutor shexec = null;
try {
String[] args = { "kill", "--", "-" + pgrpId };
@@ -168,37 +138,115 @@ public class ProcessTree {
LOG.info("Killing all processes in the process group " + pgrpId +
" with SIGTERM. Exit code " + shexec.getExitCode());
}
-
- sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
}
/**
- * Destroy the process-tree.
- * @param pid process id of the root process of the subtree of processes
- * to be killed
+ * Kills the process(OR process group) by sending the signal SIGKILL
+ * in the current thread
+ * @param pid Process id(OR process group id) of to-be-deleted-process
+ * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+ * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
+ * sending SIGTERM
+ */
+ private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
+ long sleepTimeBeforeSigKill) {
+ // Kill the subprocesses of root process(even if the root process is not
+ // alive) if process group is to be killed.
+ if (isProcessGroup || ProcessTree.isAlive(pid)) {
+ try {
+ // Sleep for some time before sending SIGKILL
+ Thread.sleep(sleepTimeBeforeSigKill);
+ } catch (InterruptedException i) {
+ LOG.warn("Thread sleep is interrupted.");
+ }
+ if(isProcessGroup) {
+ killProcessGroup(pid);
+ } else {
+ killProcess(pid);
+ }
+ }
+ }
+
+
+ /** Kills the process(OR process group) by sending the signal SIGKILL
+ * @param pid Process id(OR process group id) of to-be-deleted-process
+ * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
* @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
* after sending SIGTERM
- * @param isProcessGroup pid is a process group leader or not
* @param inBackground Process is to be killed in the back ground with
* a separate thread
*/
- public static void destroy(String pid, long sleeptimeBeforeSigkill,
- boolean isProcessGroup, boolean inBackground) {
- if(isProcessGroup) {
- destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+ private static void sigKill(String pid, boolean isProcessGroup,
+ long sleeptimeBeforeSigkill, boolean inBackground) {
+
+ if(inBackground) { // use a separate thread for killing
+ SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
+ sleeptimeBeforeSigkill);
+ sigKillThread.setDaemon(true);
+ sigKillThread.start();
}
else {
- //TODO: Destroy all the processes in the subtree in this case also.
- // For the time being, killing only the root process.
- destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+ sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+ }
+ }
+
+ /**
+ * Sends kill signal to process, forcefully terminating the process.
+ *
+ * @param pid process id
+ */
+ public static void killProcess(String pid) {
+
+ //If process tree is not alive then return immediately.
+ if(!ProcessTree.isAlive(pid)) {
+ return;
+ }
+ String[] args = { "kill", "-9", pid };
+ ShellCommandExecutor shexec = new ShellCommandExecutor(args);
+ try {
+ shexec.execute();
+ } catch (IOException e) {
+ LOG.warn("Error sending SIGKILL to process "+ pid + " ."+
+ StringUtils.stringifyException(e));
+ } finally {
+ LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
+ + shexec.getExitCode());
}
}
+ /**
+ * Sends kill signal to all process belonging to same process group,
+ * forcefully terminating the process group.
+ *
+ * @param pgrpId process group id
+ */
+ public static void killProcessGroup(String pgrpId) {
+
+ //If process tree is not alive then return immediately.
+ if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
+ return;
+ }
+ String[] args = { "kill", "-9", "-"+pgrpId };
+ ShellCommandExecutor shexec = new ShellCommandExecutor(args);
+ try {
+ shexec.execute();
+ } catch (IOException e) {
+ LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+
+ StringUtils.stringifyException(e));
+ } finally {
+ LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
+ + shexec.getExitCode());
+ }
+ }
+
/**
* Is the process with PID pid still alive?
* This method assumes that isAlive is called on a pid that was alive not
* too long ago, and hence assumes no chance of pid-wrapping-around.
+ *
+ * @param pid pid of the process to check.
+ * @return true if process is alive.
*/
public static boolean isAlive(String pid) {
ShellCommandExecutor shexec = null;
@@ -215,6 +263,32 @@ public class ProcessTree {
}
return (shexec.getExitCode() == 0 ? true : false);
}
+
+ /**
+ * Is the process group with still alive?
+ *
+ * This method assumes that isAlive is called on a pid that was alive not
+ * too long ago, and hence assumes no chance of pid-wrapping-around.
+ *
+ * @param pgrpId process group id
+ * @return true if any of process in group is alive.
+ */
+ public static boolean isProcessGroupAlive(String pgrpId) {
+ ShellCommandExecutor shexec = null;
+ try {
+ String[] args = { "kill", "-0", "-"+pgrpId };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (ExitCodeException ee) {
+ return false;
+ } catch (IOException ioe) {
+ LOG.warn("Error executing shell command "
+ + Arrays.toString(shexec.getExecString()) + ioe);
+ return false;
+ }
+ return (shexec.getExitCode() == 0 ? true : false);
+ }
+
/**
* Helper thread class that kills process-tree with SIGKILL in background
Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml Fri Mar 4 03:27:30 2011
@@ -556,7 +556,10 @@
<p>
The executable must be deployed as a setuid executable, by changing
- the ownership to <em>root</em> and giving it permissions <em>4755</em>.
+ the ownership to <em>root</em>, group ownership to that of tasktracker
+ and giving it permissions <em>4510</em>.Please take a note that,
+ group which owns task-controller should contain only tasktracker
+ as its memeber and not users who submit jobs.
</p>
<p>The executable requires a configuration file called
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar 4 03:27:30 2011
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
@@ -61,44 +60,7 @@ class DefaultTaskController extends Task
context.shExec = shexec;
shexec.execute();
}
-
- /**
- * Kills the JVM running the task stored in the context.
- *
- * @param context the context storing the task running within the JVM
- * that needs to be killed.
- */
- void killTaskJVM(TaskController.TaskControllerContext context) {
- ShellCommandExecutor shexec = context.shExec;
-
- if (shexec != null) {
- Process process = shexec.getProcess();
- if (Shell.WINDOWS) {
- // Currently we don't use setsid on WINDOWS. So kill the process alone.
- if (process != null) {
- process.destroy();
- }
- }
- else { // In addition to the task JVM, kill its subprocesses also.
- String pid = context.pid;
- if (pid != null) {
- ProcessTree.destroy(pid, context.sleeptimeBeforeSigkill,
- ProcessTree.isSetsidAvailable, false);
- try {
- if (process != null) {
- LOG.info("Process exited with exit code:" + process.waitFor());
- }
- } catch (InterruptedException ie) {}
- }
- else if (process != null) {
- // kill the task JVM alone, if we don't have the process group id
- process.destroy();
- }
- }
- }
- }
-
-
+
/**
* Initialize the task environment.
*
@@ -126,5 +88,50 @@ class DefaultTaskController extends Task
@Override
void initializeJob(JobID jobId) {
}
+
+ @Override
+ void terminateTask(TaskControllerContext context) {
+ ShellCommandExecutor shexec = context.shExec;
+ if (shexec != null) {
+ Process process = shexec.getProcess();
+ if (Shell.WINDOWS) {
+ // Currently we don't use setsid on WINDOWS.
+ //So kill the process alone.
+ if (process != null) {
+ process.destroy();
+ }
+ }
+ else { // In addition to the task JVM, kill its subprocesses also.
+ String pid = context.pid;
+ if (pid != null) {
+ if(ProcessTree.isSetsidAvailable) {
+ ProcessTree.terminateProcessGroup(pid);
+ }else {
+ ProcessTree.terminateProcess(pid);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void killTask(TaskControllerContext context) {
+ ShellCommandExecutor shexec = context.shExec;
+ if (shexec != null) {
+ if (Shell.WINDOWS) {
+ //We don't do send kill process signal in case of windows as
+ //already we have done a process.destroy() in termintateTaskJVM()
+ return;
+ }
+ String pid = context.pid;
+ if (pid != null) {
+ if(ProcessTree.isSetsidAvailable) {
+ ProcessTree.killProcessGroup(pid);
+ }else {
+ ProcessTree.killProcess(pid);
+ }
+ }
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar 4 03:27:30 2011
@@ -433,7 +433,7 @@ class JvmManager {
initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
.getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
- controller.killTaskJVM(initalContext);
+ controller.destroyTaskJVM(initalContext);
} else {
LOG.info(String.format("JVM Not killed %s but just removed", jvmId
.toString()));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 03:27:30 2011
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
@@ -32,6 +31,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
@@ -109,6 +109,7 @@ class LinuxTaskController extends TaskCo
*/
enum TaskCommands {
LAUNCH_TASK_JVM,
+ TERMINATE_TASK_JVM,
KILL_TASK_JVM
}
@@ -127,8 +128,11 @@ class LinuxTaskController extends TaskCo
String cmdLine =
TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
env.logSize, true);
+
StringBuffer sb = new StringBuffer();
- //export out all the environment variable before child command.
+ //export out all the environment variable before child command as
+ //the setuid/setgid binaries would not be getting, any environmental
+ //variables which begin with LD_*.
for(Entry<String, String> entry : env.env.entrySet()) {
sb.append("export ");
sb.append(entry.getKey());
@@ -140,34 +144,50 @@ class LinuxTaskController extends TaskCo
// write the command to a file in the
// task specific cache directory
writeCommand(sb.toString(), getTaskCacheDirectory(context));
+
// Call the taskcontroller with the right parameters.
- List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
+ List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.LAUNCH_TASK_JVM,
- context.task.getUser(),
+ env.conf.getUser(),
launchTaskJVMArgs, env);
context.shExec = shExec;
- shExec.execute();
- LOG.debug("output after executing task jvm = " + shExec.getOutput());
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while launching task JVM : " +
+ StringUtils.stringifyException(e));
+ LOG.warn("Exit code from task is : " + shExec.getExitCode());
+ LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ throw new IOException(e);
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("output after executing task jvm = " + shExec.getOutput());
+ }
}
- // convenience API for building command arguments for specific commands
- private List<String> buildTaskCommandArgs(TaskControllerContext context) {
+ /**
+ * Returns list of arguments to be passed while launching task VM.
+ * See {@code buildTaskControllerExecutor(TaskCommands,
+ * String, List<String>, JvmEnv)} documentation.
+ * @param context
+ * @return Argument to be used while launching Task VM
+ */
+ private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
List<String> commandArgs = new ArrayList<String>(3);
String taskId = context.task.getTaskID().toString();
String jobId = getJobId(context);
+ LOG.debug("getting the task directory as: "
+ + getTaskCacheDirectory(context));
+ commandArgs.add(getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context));
commandArgs.add(jobId);
if(!context.task.isTaskCleanupTask()) {
commandArgs.add(taskId);
}else {
commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
}
-
- LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context));
- commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
- context));
return commandArgs;
}
@@ -183,7 +203,7 @@ class LinuxTaskController extends TaskCo
// in mapred.local.dir chosen for storing data pertaining to
// this task.
private String getDirectoryChosenForTask(File directory,
- TaskControllerContext context) {
+ TaskControllerContext context) {
String jobId = getJobId(context);
String taskId = context.task.getTaskID().toString();
for (String dir : mapredLocalDirs) {
@@ -201,34 +221,6 @@ class LinuxTaskController extends TaskCo
}
/**
- * Kill a launched task JVM running as the user of the job.
- *
- * This method will launch the task controller setuid executable
- * that in turn will kill the task JVM by sending a kill signal.
- */
- void killTaskJVM(TaskControllerContext context) {
-
- if(context.task == null) {
- LOG.info("Context task null not killing the JVM");
- return;
- }
-
- JvmEnv env = context.env;
- List<String> killTaskJVMArgs = buildTaskCommandArgs(context);
- try {
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- TaskCommands.KILL_TASK_JVM,
- context.task.getUser(),
- killTaskJVMArgs,
- context.env);
- shExec.execute();
- LOG.debug("Command output :" +shExec.getOutput());
- } catch (IOException ioe) {
- LOG.warn("IOException in killing task: " + ioe.getMessage());
- }
- }
-
- /**
* Setup appropriate permissions for directories and files that
* are used by the task.
*
@@ -289,9 +281,24 @@ class LinuxTaskController extends TaskCo
LOG.warn("Could not change permissions for directory " + dir);
}
}
-
- // convenience API to create the executor for launching the
- // setuid script.
+ /**
+ * Builds the command line for launching/terminating/killing task JVM.
+ * Following is the format for launching/terminating/killing task JVM
+ * <br/>
+ * For launching following is command line argument:
+ * <br/>
+ * {@code user-name command tt-root job_id task_id}
+ * <br/>
+ * For terminating/killing task jvm.
+ * {@code user-name command tt-root task-pid}
+ *
+ * @param command command to be executed.
+ * @param userName user name
+ * @param cmdArgs list of extra arguments
+ * @param env JVM environment variables.
+ * @return {@link ShellCommandExecutor}
+ * @throws IOException
+ */
private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command,
String userName,
List<String> cmdArgs, JvmEnv env)
@@ -428,5 +435,65 @@ class LinuxTaskController extends TaskCo
}
}
+ /**
+ * API which builds the command line to be pass to LinuxTaskController
+ * binary to terminate/kill the task. See
+ * {@code buildTaskControllerExecutor(TaskCommands,
+ * String, List<String>, JvmEnv)} documentation.
+ *
+ *
+ * @param context context of task which has to be passed kill signal.
+ *
+ */
+ private List<String> buildKillTaskCommandArgs(TaskControllerContext
+ context){
+ List<String> killTaskJVMArgs = new ArrayList<String>();
+ killTaskJVMArgs.add(context.pid);
+ return killTaskJVMArgs;
+ }
+
+ /**
+ * Convenience method used to sending appropriate Kill signal to the task
+ * VM
+ * @param context
+ * @param command
+ * @throws IOException
+ */
+ private void finishTask(TaskControllerContext context,
+ TaskCommands command) throws IOException{
+ if(context.task == null) {
+ LOG.info("Context task null not killing the JVM");
+ return;
+ }
+ ShellCommandExecutor shExec = buildTaskControllerExecutor(
+ command, context.env.conf.getUser(),
+ buildKillTaskCommandArgs(context), context.env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ void terminateTask(TaskControllerContext context) {
+ try {
+ finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while sending kill to the Task VM " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
+ @Override
+ void killTask(TaskControllerContext context) {
+ try {
+ finishTask(context, TaskCommands.KILL_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while sending destroy to the Task VM " +
+ StringUtils.stringifyException(e));
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 03:27:30 2011
@@ -19,10 +19,12 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
@@ -38,6 +40,8 @@ abstract class TaskController implements
private Configuration conf;
+ public static final Log LOG = LogFactory.getLog(TaskController.class);
+
public Configuration getConf() {
return conf;
}
@@ -63,13 +67,29 @@ abstract class TaskController implements
throws IOException;
/**
- * Kill a task JVM
+ * Top level cleanup a task JVM method.
+ *
+ * The current implementation does the following.
+ * <ol>
+ * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
+ * to cleanup.</li>
+ * <li>Waits for stipulated period</li>
+ * <li>Sends a forceful kill signal to task JVM, terminating all its
+ * sub-process forcefully.</li>
+ * </ol>
*
- * This method defines how a JVM launched to execute one or more
- * tasks will be killed.
- * @param context
+ * @param context the task for which kill signal has to be sent.
*/
- abstract void killTaskJVM(TaskControllerContext context);
+ final void destroyTaskJVM(TaskControllerContext context) {
+ terminateTask(context);
+ try {
+ Thread.sleep(context.sleeptimeBeforeSigkill);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep interrupted : " +
+ StringUtils.stringifyException(e));
+ }
+ killTask(context);
+ }
/**
* Perform initializing actions required before a task can run.
@@ -110,4 +130,20 @@ abstract class TaskController implements
* @param tip Task of job for which localization happens.
*/
abstract void initializeJob(JobID jobId);
+
+ /**
+ * Sends a graceful terminate signal to taskJVM and it sub-processes.
+ *
+ * @param context task context
+ */
+ abstract void terminateTask(TaskControllerContext context);
+
+ /**
+ * Sends a KILL signal to forcefully terminate the taskJVM and its
+ * sub-processes.
+ *
+ * @param context task context
+ */
+
+ abstract void killTask(TaskControllerContext context);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1076976&r1=1076975&r2=1076976&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Fri Mar 4 03:27:30 2011
@@ -23,17 +23,22 @@ import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.Iterator;
+import java.util.StringTokenizer;
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,11 +53,11 @@ public class TestKillSubProcesses extend
.getLog(TestKillSubProcesses.class);
private static String TEST_ROOT_DIR = new File(System.getProperty(
- "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+ "test.build.data", "/tmp"), "killjob").toURI().toString().replace(' ', '+');
private static JobClient jobClient = null;
- private static MiniMRCluster mr = null;
+ static MiniMRCluster mr = null;
private static Path scriptDir = null;
private static String scriptDirName = null;
private static String pid = null;
@@ -69,7 +74,7 @@ public class TestKillSubProcesses extend
conf.setJobName("testkilljobsubprocesses");
conf.setMapperClass(KillingMapperWithChildren.class);
- scriptDir = new Path(TEST_ROOT_DIR + "/script");
+ scriptDir = new Path(TEST_ROOT_DIR , "script");
RunningJob job = runJobAndSetProcessHandle(jt, conf);
// kill the job now
@@ -180,9 +185,8 @@ public class TestKillSubProcesses extend
}
}
LOG.info("pid of map task is " + pid);
-
- // Checking if the map task is alive
- assertTrue(ProcessTree.isAlive(pid));
+ //Checking if the map task is alive
+ assertTrue("Map is no more alive", isAlive(pid));
LOG.info("The map task is alive before Job completion, as expected.");
}
}
@@ -215,7 +219,7 @@ public class TestKillSubProcesses extend
" is " + childPid);
assertTrue("Unexpected: The subprocess at level " + i +
" in the subtree is not alive before Job completion",
- ProcessTree.isAlive(childPid));
+ isAlive(childPid));
}
}
return job;
@@ -249,10 +253,10 @@ public class TestKillSubProcesses extend
" is " + childPid);
assertTrue("Unexpected: The subprocess at level " + i +
" in the subtree is alive after Job completion",
- !ProcessTree.isAlive(childPid));
+ !isAlive(childPid));
}
}
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.getLocal(mr.createJobConf());
if(fs.exists(scriptDir)) {
fs.delete(scriptDir, true);
}
@@ -260,10 +264,23 @@ public class TestKillSubProcesses extend
private static RunningJob runJob(JobConf conf) throws IOException {
- final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
- final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
+ final Path inDir;
+ final Path outDir;
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileSystem tempFs = FileSystem.get(conf);
+ //Check if test is run with hdfs or local file system.
+ //if local filesystem then prepend TEST_ROOT_DIR, otherwise
+ //killjob folder would be created in workspace root.
+ if (!tempFs.getUri().toASCIIString().equals(
+ fs.getUri().toASCIIString())) {
+ inDir = new Path("killjob/input");
+ outDir = new Path("killjob/output");
+ } else {
+ inDir = new Path(TEST_ROOT_DIR, "input");
+ outDir = new Path(TEST_ROOT_DIR, "output");
+ }
- FileSystem fs = FileSystem.get(conf);
+
if(fs.exists(scriptDir)) {
fs.delete(scriptDir, true);
}
@@ -289,9 +306,7 @@ public class TestKillSubProcesses extend
// run the TCs
conf = mr.createJobConf();
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
- runKillingJobAndValidate(jt, conf);
- runFailingJobAndValidate(jt, conf);
- runSuccessfulJobAndValidate(jt, conf);
+ runTests(conf, jt);
} finally {
if (mr != null) {
mr.shutdown();
@@ -299,12 +314,25 @@ public class TestKillSubProcesses extend
}
}
+ void runTests(JobConf conf, JobTracker jt) throws IOException {
+ FileSystem fs = FileSystem.getLocal(mr.createJobConf());
+ Path rootDir = new Path(TEST_ROOT_DIR);
+ if(!fs.exists(rootDir)) {
+ fs.mkdirs(rootDir);
+ }
+ fs.setPermission(rootDir,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ runKillingJobAndValidate(jt, conf);
+ runFailingJobAndValidate(jt, conf);
+ runSuccessfulJobAndValidate(jt, conf);
+ }
+
/**
* Creates signal file
*/
private static void signalTask(String signalFile, JobConf conf) {
try {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
fs.createNewFile(new Path(signalFile));
} catch(IOException e) {
LOG.warn("Unable to create signal file. " + e);
@@ -316,10 +344,12 @@ public class TestKillSubProcesses extend
*/
private static void runChildren(JobConf conf) throws IOException {
if (ProcessTree.isSetsidAvailable) {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
- scriptDir = new Path(TEST_ROOT_DIR + "/script");
-
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
+ if(fs.exists(scriptDir)){
+ fs.delete(scriptDir, true);
+ }
// create shell script
Random rm = new Random();
Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
@@ -328,6 +358,7 @@ public class TestKillSubProcesses extend
String script =
"echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
"echo hello\n" +
+ "trap 'echo got SIGTERM' 15 \n" +
"if [ $1 != 0 ]\nthen\n" +
" sh " + shellScript + " $(($1-1))\n" +
"else\n" +
@@ -446,4 +477,46 @@ public class TestKillSubProcesses extend
throw new RuntimeException("failing map");
}
}
+
+ /**
+ * Check for presence of the process with the pid passed is alive or not
+ * currently.
+ *
+ * @param pid pid of the process
+ * @return if a process is alive or not.
+ */
+ private static boolean isAlive(String pid) throws IOException {
+ String commandString ="ps -o pid,command -e";
+ String args[] = new String[] {"bash", "-c" , commandString};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(args);
+ try {
+ shExec.execute();
+ }catch(ExitCodeException e) {
+ return false;
+ } catch (IOException e) {
+ LOG.warn("IOExecption thrown while checking if process is alive" +
+ StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ String output = shExec.getOutput();
+
+ //Parse the command output and check for pid, ignore the commands
+ //which has ps or grep in it.
+ StringTokenizer strTok = new StringTokenizer(output, "\n");
+ boolean found = false;
+ while(strTok.hasMoreTokens()) {
+ StringTokenizer pidToken = new StringTokenizer(strTok.nextToken(),
+ " ");
+ String pidStr = pidToken.nextToken();
+ String commandStr = pidToken.nextToken();
+ if(pid.equals(pidStr) && !(commandStr.contains("ps")
+ || commandStr.contains("grep"))) {
+ found = true;
+ break;
+ }
+ }
+ return found;
+ }
+
}