You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/06 21:22:59 UTC
[29/50] [abbrv] hadoop git commit: YARN-3365. Enhanced NodeManager to
support using the 'tc' tool via container-executor for outbound network
traffic control. Contributed by Sidharta Seethana.
YARN-3365. Enhanced NodeManager to support using the 'tc' tool via container-executor for outbound network traffic control. Contributed by Sidharta Seethana.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f66d408
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f66d408
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f66d408
Branch: refs/heads/YARN-2928
Commit: 4f66d40802a81de710ad8b1909abee2e20e1007d
Parents: 503e490
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Thu Apr 2 16:53:59 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Apr 6 12:08:14 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 4 +
.../impl/container-executor.c | 100 +++-
.../impl/container-executor.h | 34 +-
.../main/native/container-executor/impl/main.c | 465 +++++++++++++------
.../nodemanager/TestLinuxContainerExecutor.java | 421 +++++++++++------
5 files changed, 722 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f66d408/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 18a3c37..bcd2286 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -112,6 +112,10 @@ Release 2.8.0 - UNRELEASED
YARN-3345. Add non-exclusive node label API. (Wangda Tan via jianhe)
+ YARN-3365. Enhanced NodeManager to support using the 'tc' tool via
+ container-executor for outbound network traffic control. (Sidharta Seethana
+ via vinodkv)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f66d408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
index edfd25f..485399a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
@@ -40,6 +40,12 @@ static const int DEFAULT_MIN_USERID = 1000;
static const char* DEFAULT_BANNED_USERS[] = {"yarn", "mapred", "hdfs", "bin", 0};
+//location of traffic control binary
+static const char* TC_BIN = "/sbin/tc";
+static const char* TC_MODIFY_STATE_OPTS [] = { "-b" , NULL};
+static const char* TC_READ_STATE_OPTS [] = { "-b", NULL};
+static const char* TC_READ_STATS_OPTS [] = { "-s", "-b", NULL};
+
//struct to store the user details
struct passwd *user_detail = NULL;
@@ -291,27 +297,20 @@ static int write_exit_code_file(const char* exit_code_file, int exit_code) {
return 0;
}
-/**
- * Wait for the container process to exit and write the exit code to
- * the exit code file.
- * Returns the exit code of the container process.
- */
-static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+static int wait_and_get_exit_code(pid_t pid) {
int child_status = -1;
int exit_code = -1;
int waitpid_result;
- if (change_effective_user(nm_uid, nm_gid) != 0) {
- return -1;
- }
do {
- waitpid_result = waitpid(pid, &child_status, 0);
+ waitpid_result = waitpid(pid, &child_status, 0);
} while (waitpid_result == -1 && errno == EINTR);
+
if (waitpid_result < 0) {
- fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
- pid, strerror(errno));
+ fprintf(LOGFILE, "error waiting for process %d - %s\n", pid, strerror(errno));
return -1;
}
+
if (WIFEXITED(child_status)) {
exit_code = WEXITSTATUS(child_status);
} else if (WIFSIGNALED(child_status)) {
@@ -319,9 +318,26 @@ static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
} else {
fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
}
+
+ return exit_code;
+}
+
+/**
+ * Wait for the container process to exit and write the exit code to
+ * the exit code file.
+ * Returns the exit code of the container process.
+ */
+static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
+ int exit_code = -1;
+
+ if (change_effective_user(nm_uid, nm_gid) != 0) {
+ return -1;
+ }
+ exit_code = wait_and_get_exit_code(pid);
if (write_exit_code_file(exit_code_file, exit_code) < 0) {
return -1;
}
+
return exit_code;
}
@@ -1470,3 +1486,63 @@ int mount_cgroup(const char *pair, const char *hierarchy) {
#endif
}
+static int run_traffic_control(const char *opts[], char *command_file) {
+ const int max_tc_args = 16;
+ char *args[max_tc_args];
+ int i = 0, j = 0;
+
+ args[i++] = TC_BIN;
+ while (opts[j] != NULL && i < max_tc_args - 1) {
+ args[i] = opts[j];
+ ++i, ++j;
+ }
+ //too many args to tc
+ if (i == max_tc_args - 1) {
+ fprintf(LOGFILE, "too many args to tc");
+ return TRAFFIC_CONTROL_EXECUTION_FAILED;
+ }
+ args[i++] = command_file;
+ args[i] = 0;
+
+ pid_t child_pid = fork();
+ if (child_pid != 0) {
+ int exit_code = wait_and_get_exit_code(child_pid);
+ if (exit_code != 0) {
+ fprintf(LOGFILE, "failed to execute tc command!\n");
+ return TRAFFIC_CONTROL_EXECUTION_FAILED;
+ }
+ unlink(command_file);
+ return 0;
+ } else {
+ execv(TC_BIN, args);
+ //if we reach here, exec failed
+ fprintf(LOGFILE, "failed to execute tc command! error: %s\n", strerror(errno));
+ return TRAFFIC_CONTROL_EXECUTION_FAILED;
+ }
+}
+
+/**
+ * Run a batch of tc commands that modify interface configuration. command_file
+ * is deleted after being used.
+ */
+int traffic_control_modify_state(char *command_file) {
+ return run_traffic_control(TC_MODIFY_STATE_OPTS, command_file);
+}
+
+/**
+ * Run a batch of tc commands that read interface configuration. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process. command_file is deleted after being used.
+ */
+int traffic_control_read_state(char *command_file) {
+ return run_traffic_control(TC_READ_STATE_OPTS, command_file);
+}
+
+/**
+ * Run a batch of tc commands that read interface stats. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process. command_file is deleted after being used.
+ */
+int traffic_control_read_stats(char *command_file) {
+ return run_traffic_control(TC_READ_STATS_OPTS, command_file);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f66d408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
index b1efd6a..43ef98d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
@@ -54,7 +54,20 @@ enum errorcodes {
INVALID_CONFIG_FILE = 24,
SETSID_OPER_FAILED = 25,
WRITE_PIDFILE_FAILED = 26,
- WRITE_CGROUP_FAILED = 27
+ WRITE_CGROUP_FAILED = 27,
+ TRAFFIC_CONTROL_EXECUTION_FAILED = 28
+};
+
+enum operations {
+ CHECK_SETUP = 1,
+ MOUNT_CGROUPS = 2,
+ TRAFFIC_CONTROL_MODIFY_STATE = 3,
+ TRAFFIC_CONTROL_READ_STATE = 4,
+ TRAFFIC_CONTROL_READ_STATS = 5,
+ RUN_AS_USER_INITIALIZE_CONTAINER = 6,
+ RUN_AS_USER_LAUNCH_CONTAINER = 7,
+ RUN_AS_USER_SIGNAL_CONTAINER = 8,
+ RUN_AS_USER_DELETE = 9
};
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
@@ -209,3 +222,22 @@ int check_dir(char* npath, mode_t st_mode, mode_t desired,
int create_validate_dir(char* npath, mode_t perm, char* path,
int finalComponent);
+
+/**
+ * Run a batch of tc commands that modify interface configuration
+ */
+int traffic_control_modify_state(char *command_file);
+
+/**
+ * Run a batch of tc commands that read interface configuration. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process.
+ */
+int traffic_control_read_state(char *command_file);
+
+/**
+ * Run a batch of tc commands that read interface stats. Output is
+ * written to standard output and it is expected to be read and parsed by the
+ * calling process.
+ */
+int traffic_control_read_stats(char *command_file);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f66d408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
index 9b5e784..63fbfe4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
@@ -42,84 +42,71 @@
#error HADOOP_CONF_DIR must be defined
#endif
-void display_usage(FILE *stream) {
- fprintf(stream,
- "Usage: container-executor --checksetup\n");
- fprintf(stream,
- "Usage: container-executor --mount-cgroups "\
- "hierarchy controller=path...\n");
- fprintf(stream,
- "Usage: container-executor user yarn-user command command-args\n");
- fprintf(stream, "Commands:\n");
- fprintf(stream, " initialize container: %2d appid tokens " \
- "nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
- fprintf(stream,
- " launch container: %2d appid containerid workdir "\
- "container-script tokens pidfile nm-local-dirs nm-log-dirs resources\n",
- LAUNCH_CONTAINER);
- fprintf(stream, " signal container: %2d container-pid signal\n",
- SIGNAL_CONTAINER);
- fprintf(stream, " delete as user: %2d relative-path\n",
- DELETE_AS_USER);
+static void display_usage(FILE *stream) {
+ char *usage_template =
+ "Usage: container-executor --checksetup\n" \
+ " container-executor --mount-cgroups <hierarchy> <controller=path>...\n" \
+ " container-executor --tc-modify-state <command-file>\n" \
+ " container-executor --tc-read-state <command-file>\n" \
+ " container-executor --tc-read-stats <command-file>\n" \
+ " container-executor <user> <yarn-user> <command> <command-args>\n" \
+ " where command and command-args: \n" \
+ " initialize container: %2d appid tokens nm-local-dirs nm-log-dirs cmd app...\n" \
+ " launch container: %2d appid containerid workdir container-script " \
+ "tokens pidfile nm-local-dirs nm-log-dirs resources optional-tc-command-file\n" \
+ " signal container: %2d container-pid signal\n" \
+ " delete as user: %2d relative-path\n" ;
+
+
+ fprintf(stream, usage_template, INITIALIZE_CONTAINER, LAUNCH_CONTAINER,
+ SIGNAL_CONTAINER, DELETE_AS_USER);
}
-int main(int argc, char **argv) {
- int invalid_args = 0;
- int do_check_setup = 0;
- int do_mount_cgroups = 0;
-
- LOGFILE = stdout;
- ERRORFILE = stderr;
+/* Sets up log files for normal/error logging */
+static void open_log_files() {
+ if (LOGFILE == NULL) {
+ LOGFILE = stdout;
+ }
- if (argc > 1) {
- if (strcmp("--mount-cgroups", argv[1]) == 0) {
- do_mount_cgroups = 1;
- }
+ if (ERRORFILE == NULL) {
+ ERRORFILE = stderr;
}
+}
- // Minimum number of arguments required to run
- // the std. container-executor commands is 4
- // 4 args not needed for checksetup option
- if (argc < 4 && !do_mount_cgroups) {
- invalid_args = 1;
- if (argc == 2) {
- const char *arg1 = argv[1];
- if (strcmp("--checksetup", arg1) == 0) {
- invalid_args = 0;
- do_check_setup = 1;
- }
- }
+/* Flushes and closes log files */
+static void flush_and_close_log_files() {
+ if (LOGFILE != NULL) {
+ fflush(LOGFILE);
+ fclose(LOGFILE);
+ LOGFILE = NULL;
}
- if (invalid_args != 0) {
- display_usage(stdout);
- return INVALID_ARGUMENT_NUMBER;
+if (ERRORFILE != NULL) {
+ fflush(ERRORFILE);
+ fclose(ERRORFILE);
+ ERRORFILE = NULL;
}
+}
- int command;
- const char * app_id = NULL;
- const char * container_id = NULL;
- const char * cred_file = NULL;
- const char * script_file = NULL;
- const char * current_dir = NULL;
- const char * pid_file = NULL;
-
- int exit_code = 0;
-
- char * dir_to_be_deleted = NULL;
+/** Validates the current container-executor setup. Causes program exit
+in case of validation failures. Also sets up configuration / group information etc.,
+This function is to be called in every invocation of container-executor, irrespective
+of whether an explicit checksetup operation is requested. */
+static void assert_valid_setup(char *current_executable) {
char *executable_file = get_executable();
char *orig_conf_file = HADOOP_CONF_DIR "/" CONF_FILENAME;
- char *conf_file = resolve_config_path(orig_conf_file, argv[0]);
- char *local_dirs, *log_dirs;
- char *resources, *resources_key, *resources_value;
+ char *conf_file = resolve_config_path(orig_conf_file, current_executable);
if (conf_file == NULL) {
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
+ flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
+
if (check_configuration_permissions(conf_file) != 0) {
+ flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
read_config(conf_file);
@@ -129,13 +116,14 @@ int main(int argc, char **argv) {
char *nm_group = get_value(NM_GROUP_KEY);
if (nm_group == NULL) {
fprintf(ERRORFILE, "Can't get configured value for %s.\n", NM_GROUP_KEY);
+ flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
struct group *group_info = getgrnam(nm_group);
if (group_info == NULL) {
fprintf(ERRORFILE, "Can't get group information for %s - %s.\n", nm_group,
strerror(errno));
- fflush(LOGFILE);
+ flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
set_nm_uid(getuid(), group_info->gr_gid);
@@ -146,91 +134,162 @@ int main(int argc, char **argv) {
if (check_executor_permissions(executable_file) != 0) {
fprintf(ERRORFILE, "Invalid permissions on container-executor binary.\n");
- return INVALID_CONTAINER_EXEC_PERMISSIONS;
+ flush_and_close_log_files();
+ exit(INVALID_CONTAINER_EXEC_PERMISSIONS);
}
+}
+
+
+/* Use to store parsed input parmeters for various operations */
+static struct {
+ char *cgroups_hierarchy;
+ char *traffic_control_command_file;
+ const char * run_as_user_name;
+ const char * yarn_user_name;
+ char *local_dirs;
+ char *log_dirs;
+ char *resources_key;
+ char *resources_value;
+ char **resources_values;
+ const char * app_id;
+ const char * container_id;
+ const char * cred_file;
+ const char * script_file;
+ const char * current_dir;
+ const char * pid_file;
+ const char *dir_to_be_deleted;
+ int container_pid;
+ int signal;
+} cmd_input;
+
+static int validate_run_as_user_commands(int argc, char **argv, int *operation);
+
+/* Validates that arguments used in the invocation are valid. In case of validation
+failure, an 'errorcode' is returned. In case of successful validation, a zero is
+returned and 'operation' is populated based on the operation being requested.
+Ideally, we should re-factor container-executor to use a more structured, command
+line parsing mechanism (e.g getopt). For the time being, we'll use this manual
+validation mechanism so that we don't have to change the invocation interface.
+*/
- if (do_check_setup != 0) {
- // basic setup checks done
- // verified configs available and valid
- // verified executor permissions
+static int validate_arguments(int argc, char **argv , int *operation) {
+ if (argc < 2) {
+ display_usage(stdout);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ if (strcmp("--checksetup", argv[1]) == 0) {
+ *operation = CHECK_SETUP;
return 0;
}
- if (do_mount_cgroups) {
+ if (strcmp("--mount-cgroups", argv[1]) == 0) {
+ if (argc < 4) {
+ display_usage(stdout);
+ return INVALID_ARGUMENT_NUMBER;
+ }
optind++;
- char *hierarchy = argv[optind++];
- int result = 0;
+ cmd_input.cgroups_hierarchy = argv[optind++];
+ *operation = MOUNT_CGROUPS;
+ return 0;
+ }
- while (optind < argc && result == 0) {
- result = mount_cgroup(argv[optind++], hierarchy);
+ if (strcmp("--tc-modify-state", argv[1]) == 0) {
+ if (argc != 3) {
+ display_usage(stdout);
+ return INVALID_ARGUMENT_NUMBER;
}
-
- return result;
+ optind++;
+ cmd_input.traffic_control_command_file = argv[optind++];
+ *operation = TRAFFIC_CONTROL_MODIFY_STATE;
+ return 0;
}
- //checks done for user name
- if (argv[optind] == NULL) {
- fprintf(ERRORFILE, "Invalid user name.\n");
- return INVALID_USER_NAME;
+ if (strcmp("--tc-read-state", argv[1]) == 0) {
+ if (argc != 3) {
+ display_usage(stdout);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ optind++;
+ cmd_input.traffic_control_command_file = argv[optind++];
+ *operation = TRAFFIC_CONTROL_READ_STATE;
+ return 0;
}
- int ret = set_user(argv[optind]);
- if (ret != 0) {
- return ret;
+ if (strcmp("--tc-read-stats", argv[1]) == 0) {
+ if (argc != 3) {
+ display_usage(stdout);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ optind++;
+ cmd_input.traffic_control_command_file = argv[optind++];
+ *operation = TRAFFIC_CONTROL_READ_STATS;
+ return 0;
}
- // this string is used for building pathnames, the
- // process management is done based on the 'user_detail'
- // global, which was set by 'set_user()' above
- optind = optind + 1;
- char *yarn_user_name = argv[optind];
- if (yarn_user_name == NULL) {
- fprintf(ERRORFILE, "Invalid yarn user name.\n");
- return INVALID_USER_NAME;
+ /* Now we have to validate 'run as user' operations that don't use
+ a 'long option' - we should fix this at some point. The validation/argument
+ parsing here is extensive enough that it done in a separate function */
+
+ return validate_run_as_user_commands(argc, argv, operation);
+}
+
+/* Parse/validate 'run as user' commands */
+static int validate_run_as_user_commands(int argc, char **argv, int *operation) {
+ /* We need at least the following arguments in order to proceed further :
+ <user>, <yarn-user> <command> - i.e at argc should be at least 4 */
+
+ if (argc < 4) {
+ display_usage(stdout);
+ return INVALID_ARGUMENT_NUMBER;
}
-
- optind = optind + 1;
- command = atoi(argv[optind++]);
- fprintf(LOGFILE, "main : command provided %d\n",command);
- fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
- fprintf(LOGFILE, "main : requested yarn user is %s\n", yarn_user_name);
+ cmd_input.run_as_user_name = argv[optind++];
+ cmd_input.yarn_user_name = argv[optind++];
+ int command = atoi(argv[optind++]);
+
+ fprintf(LOGFILE, "main : command provided %d\n", command);
+ fprintf(LOGFILE, "main : run as user is %s\n", cmd_input.run_as_user_name);
+ fprintf(LOGFILE, "main : requested yarn user is %s\n", cmd_input.yarn_user_name);
fflush(LOGFILE);
switch (command) {
case INITIALIZE_CONTAINER:
if (argc < 9) {
fprintf(ERRORFILE, "Too few arguments (%d vs 9) for initialize container\n",
- argc);
+ argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
- app_id = argv[optind++];
- cred_file = argv[optind++];
- local_dirs = argv[optind++];// good local dirs as a comma separated list
- log_dirs = argv[optind++];// good log dirs as a comma separated list
- exit_code = initialize_app(yarn_user_name, app_id, cred_file,
- extract_values(local_dirs),
- extract_values(log_dirs), argv + optind);
- break;
+ cmd_input.app_id = argv[optind++];
+ cmd_input.cred_file = argv[optind++];
+ cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
+ cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
+
+ *operation = RUN_AS_USER_INITIALIZE_CONTAINER;
+ return 0;
+
case LAUNCH_CONTAINER:
- if (argc != 13) {
- fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13) for launch container\n",
- argc);
+ //kill me now.
+ if (!(argc == 13 || argc == 14)) {
+ fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13 or 14) for launch container\n",
+ argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
- app_id = argv[optind++];
- container_id = argv[optind++];
- current_dir = argv[optind++];
- script_file = argv[optind++];
- cred_file = argv[optind++];
- pid_file = argv[optind++];
- local_dirs = argv[optind++];// good local dirs as a comma separated list
- log_dirs = argv[optind++];// good log dirs as a comma separated list
- resources = argv[optind++];// key,value pair describing resources
- char *resources_key = malloc(strlen(resources));
- char *resources_value = malloc(strlen(resources));
+
+ cmd_input.app_id = argv[optind++];
+ cmd_input.container_id = argv[optind++];
+ cmd_input.current_dir = argv[optind++];
+ cmd_input.script_file = argv[optind++];
+ cmd_input.cred_file = argv[optind++];
+ cmd_input.pid_file = argv[optind++];
+ cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
+ cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
+ char * resources = argv[optind++];// key,value pair describing resources
+ char * resources_key = malloc(strlen(resources));
+ char * resources_value = malloc(strlen(resources));
+
if (get_kv_key(resources, resources_key, strlen(resources)) < 0 ||
get_kv_value(resources, resources_value, strlen(resources)) < 0) {
fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s",
@@ -240,51 +299,157 @@ int main(int argc, char **argv) {
free(resources_value);
return INVALID_ARGUMENT_NUMBER;
}
- char** resources_values = extract_values(resources_value);
- exit_code = launch_container_as_user(yarn_user_name, app_id,
- container_id, current_dir, script_file, cred_file,
- pid_file, extract_values(local_dirs),
- extract_values(log_dirs), resources_key,
- resources_values);
- free(resources_key);
- free(resources_value);
- break;
+
+ //network isolation through tc
+ if (argc == 14) {
+ cmd_input.traffic_control_command_file = argv[optind++];
+ }
+
+ cmd_input.resources_key = resources_key;
+ cmd_input.resources_value = resources_value;
+ cmd_input.resources_values = extract_values(resources_value);
+ *operation = RUN_AS_USER_LAUNCH_CONTAINER;
+ return 0;
+
case SIGNAL_CONTAINER:
if (argc != 6) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 6) for " \
"signal container\n", argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
- } else {
- char* end_ptr = NULL;
- char* option = argv[optind++];
- int container_pid = strtol(option, &end_ptr, 10);
- if (option == end_ptr || *end_ptr != '\0') {
- fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
- fflush(ERRORFILE);
- return INVALID_ARGUMENT_NUMBER;
- }
- option = argv[optind++];
- int signal = strtol(option, &end_ptr, 10);
- if (option == end_ptr || *end_ptr != '\0') {
- fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
- fflush(ERRORFILE);
- return INVALID_ARGUMENT_NUMBER;
- }
- exit_code = signal_container_as_user(yarn_user_name, container_pid, signal);
}
- break;
+
+ char* end_ptr = NULL;
+ char* option = argv[optind++];
+ cmd_input.container_pid = strtol(option, &end_ptr, 10);
+ if (option == end_ptr || *end_ptr != '\0') {
+ fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+ option = argv[optind++];
+ cmd_input.signal = strtol(option, &end_ptr, 10);
+ if (option == end_ptr || *end_ptr != '\0') {
+ fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
+ fflush(ERRORFILE);
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ *operation = RUN_AS_USER_SIGNAL_CONTAINER;
+ return 0;
+
case DELETE_AS_USER:
- dir_to_be_deleted = argv[optind++];
- exit_code= delete_as_user(yarn_user_name, dir_to_be_deleted,
- argv + optind);
- break;
+ cmd_input.dir_to_be_deleted = argv[optind++];
+ *operation = RUN_AS_USER_DELETE;
+ return 0;
default:
fprintf(ERRORFILE, "Invalid command %d not supported.",command);
fflush(ERRORFILE);
- exit_code = INVALID_COMMAND_PROVIDED;
+ return INVALID_COMMAND_PROVIDED;
}
- fclose(LOGFILE);
- fclose(ERRORFILE);
+}
+
+int main(int argc, char **argv) {
+ open_log_files();
+ assert_valid_setup(argv[0]);
+
+ int operation;
+ int ret = validate_arguments(argc, argv, &operation);
+
+ if (ret != 0) {
+ flush_and_close_log_files();
+ return ret;
+ }
+
+ int exit_code = 0;
+
+ switch (operation) {
+ case CHECK_SETUP:
+ //we already did this
+ exit_code = 0;
+ break;
+ case MOUNT_CGROUPS:
+ exit_code = 0;
+
+ while (optind < argc && exit_code == 0) {
+ exit_code = mount_cgroup(argv[optind++], cmd_input.cgroups_hierarchy);
+ }
+
+ break;
+ case TRAFFIC_CONTROL_MODIFY_STATE:
+ exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
+ break;
+ case TRAFFIC_CONTROL_READ_STATE:
+ exit_code = traffic_control_read_state(cmd_input.traffic_control_command_file);
+ break;
+ case TRAFFIC_CONTROL_READ_STATS:
+ exit_code = traffic_control_read_stats(cmd_input.traffic_control_command_file);
+ break;
+ case RUN_AS_USER_INITIALIZE_CONTAINER:
+ exit_code = set_user(cmd_input.run_as_user_name);
+ if (exit_code != 0) {
+ break;
+ }
+
+ exit_code = initialize_app(cmd_input.yarn_user_name,
+ cmd_input.app_id,
+ cmd_input.cred_file,
+ extract_values(cmd_input.local_dirs),
+ extract_values(cmd_input.log_dirs),
+ argv + optind);
+ break;
+ case RUN_AS_USER_LAUNCH_CONTAINER:
+ if (cmd_input.traffic_control_command_file != NULL) {
+ //apply tc rules before switching users and launching the container
+ exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
+ if( exit_code != 0) {
+ //failed to apply tc rules - break out before launching the container
+ break;
+ }
+ }
+
+ exit_code = set_user(cmd_input.run_as_user_name);
+ if (exit_code != 0) {
+ break;
+ }
+
+ exit_code = launch_container_as_user(cmd_input.yarn_user_name,
+ cmd_input.app_id,
+ cmd_input.container_id,
+ cmd_input.current_dir,
+ cmd_input.script_file,
+ cmd_input.cred_file,
+ cmd_input.pid_file,
+ extract_values(cmd_input.local_dirs),
+ extract_values(cmd_input.log_dirs),
+ cmd_input.resources_key,
+ cmd_input.resources_values);
+ free(cmd_input.resources_key);
+ free(cmd_input.resources_value);
+ free(cmd_input.resources_values);
+ break;
+ case RUN_AS_USER_SIGNAL_CONTAINER:
+ exit_code = set_user(cmd_input.run_as_user_name);
+ if (exit_code != 0) {
+ break;
+ }
+
+ exit_code = signal_container_as_user(cmd_input.yarn_user_name,
+ cmd_input.container_pid,
+ cmd_input.signal);
+ break;
+ case RUN_AS_USER_DELETE:
+ exit_code = set_user(cmd_input.run_as_user_name);
+ if (exit_code != 0) {
+ break;
+ }
+
+ exit_code = delete_as_user(cmd_input.yarn_user_name,
+ cmd_input.dir_to_be_deleted,
+ argv + optind);
+ break;
+ }
+
+ flush_and_close_log_files();
return exit_code;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f66d408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
index 7417f69..723ac92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
@@ -1,20 +1,20 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.nodemanager;
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -32,13 +33,13 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -61,57 +62,88 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
/**
- * This is intended to test the LinuxContainerExecutor code, but because of
- * some security restrictions this can only be done with some special setup
- * first.
- * <br><ol>
+ * This is intended to test the LinuxContainerExecutor code, but because of some
+ * security restrictions this can only be done with some special setup first. <br>
+ * <ol>
* <li>Compile the code with container-executor.conf.dir set to the location you
- * want for testing.
- * <br><pre><code>
+ * want for testing. <br>
+ *
+ * <pre>
+ * <code>
* > mvn clean install -Pnative -Dcontainer-executor.conf.dir=/etc/hadoop
* -DskipTests
- * </code></pre>
+ * </code>
+ * </pre>
*
* <li>Set up <code>${container-executor.conf.dir}/container-executor.cfg</code>
* container-executor.cfg needs to be owned by root and have in it the proper
- * config values.
- * <br><pre><code>
+ * config values. <br>
+ *
+ * <pre>
+ * <code>
* > cat /etc/hadoop/container-executor.cfg
* yarn.nodemanager.linux-container-executor.group=mapred
* #depending on the user id of the application.submitter option
* min.user.id=1
* > sudo chown root:root /etc/hadoop/container-executor.cfg
* > sudo chmod 444 /etc/hadoop/container-executor.cfg
- * </code></pre>
+ * </code>
+ * </pre>
*
- * <li>Move the binary and set proper permissions on it. It needs to be owned
- * by root, the group needs to be the group configured in container-executor.cfg,
+ * <li>Move the binary and set proper permissions on it. It needs to be owned by
+ * root, the group needs to be the group configured in container-executor.cfg,
* and it needs the setuid bit set. (The build will also overwrite it so you
- * need to move it to a place that you can support it.
- * <br><pre><code>
+ * need to move it to a place that you can support it. <br>
+ *
+ * <pre>
+ * <code>
* > cp ./hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/container-executor /tmp/
* > sudo chown root:mapred /tmp/container-executor
- * > sudo chmod 4550 /tmp/container-executor
- * </code></pre>
+ * > sudo chmod 4050 /tmp/container-executor
+ * </code>
+ * </pre>
*
* <li>Run the tests with the execution enabled (The user you run the tests as
- * needs to be part of the group from the config.
- * <br><pre><code>
+ * needs to be part of the group from the config. <br>
+ *
+ * <pre>
+ * <code>
* mvn test -Dtest=TestLinuxContainerExecutor -Dapplication.submitter=nobody -Dcontainer-executor.path=/tmp/container-executor
- * </code></pre>
+ * </code>
+ * </pre>
+ *
+ * <li>The test suite also contains tests to test mounting of CGroups. By
+ * default, these tests are not run. To run them, add -Dcgroups.mount=<mount-point>
+ * Please note that the test does not unmount the CGroups at the end of the test,
+ * since that requires root permissions. <br>
+ *
+ * <li>The tests that are run are sensitive to directory permissions. All parent
+ * directories must be searchable by the user that the tasks are run as. If you
+ * wish to run the tests in a different directory, please set it using
+ * -Dworkspace.dir
+ *
* </ol>
*/
public class TestLinuxContainerExecutor {
private static final Log LOG = LogFactory
- .getLog(TestLinuxContainerExecutor.class);
-
- private static File workSpace = new File("target",
- TestLinuxContainerExecutor.class.getName() + "-workSpace");
-
+ .getLog(TestLinuxContainerExecutor.class);
+
+ private static File workSpace;
+ static {
+ String basedir = System.getProperty("workspace.dir");
+ if(basedir == null || basedir.isEmpty()) {
+ basedir = "target";
+ }
+ workSpace = new File(basedir,
+ TestLinuxContainerExecutor.class.getName() + "-workSpace");
+ }
+
private LinuxContainerExecutor exec = null;
private String appSubmitter = null;
private LocalDirsHandlerService dirsHandler;
@@ -125,20 +157,26 @@ public class TestLinuxContainerExecutor {
files.mkdir(workSpacePath, null, true);
FileUtil.chmod(workSpace.getAbsolutePath(), "777");
File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
- files.mkdir(new Path(localDir.getAbsolutePath()),
- new FsPermission("777"), false);
+ files.mkdir(new Path(localDir.getAbsolutePath()), new FsPermission("777"),
+ false);
File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
- files.mkdir(new Path(logDir.getAbsolutePath()),
- new FsPermission("777"), false);
+ files.mkdir(new Path(logDir.getAbsolutePath()), new FsPermission("777"),
+ false);
String exec_path = System.getProperty("container-executor.path");
- if(exec_path != null && !exec_path.isEmpty()) {
+ if (exec_path != null && !exec_path.isEmpty()) {
conf = new Configuration(false);
conf.setClass("fs.AbstractFileSystem.file.impl",
org.apache.hadoop.fs.local.LocalFs.class,
org.apache.hadoop.fs.AbstractFileSystem.class);
- conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "xuan");
- LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
- +"="+exec_path);
+
+ appSubmitter = System.getProperty("application.submitter");
+ if (appSubmitter == null || appSubmitter.isEmpty()) {
+ appSubmitter = "nobody";
+ }
+
+ conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, appSubmitter);
+ LOG.info("Setting " + YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+ + "=" + exec_path);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
exec = new LinuxContainerExecutor();
exec.setConf(conf);
@@ -146,34 +184,86 @@ public class TestLinuxContainerExecutor {
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
+ List<String> localDirs = dirsHandler.getLocalDirs();
+ for (String dir : localDirs) {
+ Path userDir = new Path(dir, ContainerLocalizer.USERCACHE);
+ files.mkdir(userDir, new FsPermission("777"), false);
+ // $local/filecache
+ Path fileDir = new Path(dir, ContainerLocalizer.FILECACHE);
+ files.mkdir(fileDir, new FsPermission("777"), false);
+ }
}
- appSubmitter = System.getProperty("application.submitter");
- if(appSubmitter == null || appSubmitter.isEmpty()) {
- appSubmitter = "nobody";
- }
+
}
@After
public void tearDown() throws Exception {
FileContext.getLocalFSFileContext().delete(
- new Path(workSpace.getAbsolutePath()), true);
+ new Path(workSpace.getAbsolutePath()), true);
+ }
+
+ private void cleanupUserAppCache(String user) throws Exception {
+ List<String> localDirs = dirsHandler.getLocalDirs();
+ for (String dir : localDirs) {
+ Path usercachedir = new Path(dir, ContainerLocalizer.USERCACHE);
+ Path userdir = new Path(usercachedir, user);
+ Path appcachedir = new Path(userdir, ContainerLocalizer.APPCACHE);
+ exec.deleteAsUser(user, appcachedir);
+ FileContext.getLocalFSFileContext().delete(usercachedir, true);
+ }
+ }
+
+ private void cleanupUserFileCache(String user) {
+ List<String> localDirs = dirsHandler.getLocalDirs();
+ for (String dir : localDirs) {
+ Path filecache = new Path(dir, ContainerLocalizer.FILECACHE);
+ Path filedir = new Path(filecache, user);
+ exec.deleteAsUser(user, filedir);
+ }
+ }
+
+ private void cleanupLogDirs(String user) {
+ List<String> logDirs = dirsHandler.getLogDirs();
+ for (String dir : logDirs) {
+ String appId = "APP_" + id;
+ String containerId = "CONTAINER_" + (id - 1);
+ Path appdir = new Path(dir, appId);
+ Path containerdir = new Path(appdir, containerId);
+ exec.deleteAsUser(user, containerdir);
+ }
+ }
+
+ private void cleanupAppFiles(String user) throws Exception {
+ cleanupUserAppCache(user);
+ cleanupUserFileCache(user);
+ cleanupLogDirs(user);
+
+ String[] files =
+ { "launch_container.sh", "container_tokens", "touch-file" };
+ Path ws = new Path(workSpace.toURI());
+ for (String file : files) {
+ File f = new File(workSpace, file);
+ if (f.exists()) {
+ exec.deleteAsUser(user, new Path(file), ws);
+ }
+ }
}
private boolean shouldRun() {
- if(exec == null) {
+ if (exec == null) {
LOG.warn("Not running test because container-executor.path is not set");
return false;
}
return true;
}
-
- private String writeScriptFile(String ... cmd) throws IOException {
+
+ private String writeScriptFile(String... cmd) throws IOException {
File f = File.createTempFile("TestLinuxContainerExecutor", ".sh");
f.deleteOnExit();
PrintWriter p = new PrintWriter(new FileOutputStream(f));
p.println("#!/bin/sh");
p.print("exec");
- for(String part: cmd) {
+ for (String part : cmd) {
p.print(" '");
p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
p.print("'");
@@ -182,36 +272,36 @@ public class TestLinuxContainerExecutor {
p.close();
return f.getAbsolutePath();
}
-
+
private int id = 0;
+
private synchronized int getNextId() {
id += 1;
return id;
}
-
+
private ContainerId getNextContainerId() {
ContainerId cId = mock(ContainerId.class);
- String id = "CONTAINER_"+getNextId();
+ String id = "CONTAINER_" + getNextId();
when(cId.toString()).thenReturn(id);
return cId;
}
-
- private int runAndBlock(String ... cmd) throws IOException {
+ private int runAndBlock(String... cmd) throws IOException {
return runAndBlock(getNextContainerId(), cmd);
}
-
- private int runAndBlock(ContainerId cId, String ... cmd) throws IOException {
- String appId = "APP_"+getNextId();
+
+ private int runAndBlock(ContainerId cId, String... cmd) throws IOException {
+ String appId = "APP_" + getNextId();
Container container = mock(Container.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
- HashMap<String, String> env = new HashMap<String,String>();
+ HashMap<String, String> env = new HashMap<String, String>();
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(context.getEnvironment()).thenReturn(env);
-
+
String script = writeScriptFile(cmd);
Path scriptPath = new Path(script);
@@ -221,46 +311,36 @@ public class TestLinuxContainerExecutor {
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath,
- appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
- dirsHandler.getLogDirs());
+ appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
+ dirsHandler.getLogDirs());
}
-
+
@Test
public void testContainerLocalizer() throws Exception {
- if (!shouldRun()) {
- return;
- }
- List<String> localDirs = dirsHandler.getLocalDirs();
- List<String> logDirs = dirsHandler.getLogDirs();
- for (String localDir : localDirs) {
- Path userDir =
- new Path(localDir, ContainerLocalizer.USERCACHE);
- files.mkdir(userDir, new FsPermission("777"), false);
- // $local/filecache
- Path fileDir =
- new Path(localDir, ContainerLocalizer.FILECACHE);
- files.mkdir(fileDir, new FsPermission("777"), false);
- }
+
+ Assume.assumeTrue(shouldRun());
+
String locId = "container_01_01";
Path nmPrivateContainerTokensPath =
- dirsHandler.getLocalPathForWrite(
- ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
- + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
- locId));
+ dirsHandler
+ .getLocalPathForWrite(ResourceLocalizationService.NM_PRIVATE_DIR
+ + Path.SEPARATOR
+ + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId));
files.create(nmPrivateContainerTokensPath, EnumSet.of(CREATE, OVERWRITE));
Configuration config = new YarnConfiguration(conf);
- InetSocketAddress nmAddr = config.getSocketAddr(
- YarnConfiguration.NM_BIND_HOST,
- YarnConfiguration.NM_LOCALIZER_ADDRESS,
- YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
- YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+ InetSocketAddress nmAddr =
+ config.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
String appId = "application_01_01";
exec = new LinuxContainerExecutor() {
@Override
- public void buildMainArgs(List<String> command, String user, String appId,
- String locId, InetSocketAddress nmAddr, List<String> localDirs) {
- MockContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
- localDirs);
+ public void buildMainArgs(List<String> command, String user,
+ String appId, String locId, InetSocketAddress nmAddr,
+ List<String> localDirs) {
+ MockContainerLocalizer.buildMainArgs(command, user, appId, locId,
+ nmAddr, localDirs);
}
};
exec.setConf(conf);
@@ -277,44 +357,68 @@ public class TestLinuxContainerExecutor {
files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
exec.startLocalizer(nmPrivateContainerTokensPath2, nmAddr, appSubmitter,
appId, locId2, dirsHandler);
+ cleanupUserAppCache(appSubmitter);
}
-
+
@Test
- public void testContainerLaunch() throws IOException {
- if (!shouldRun()) {
- return;
- }
+ public void testContainerLaunch() throws Exception {
+ Assume.assumeTrue(shouldRun());
+ String expectedRunAsUser =
+ conf.get(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
+ YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
+
+ File touchFile = new File(workSpace, "touch-file");
+ int ret = runAndBlock("touch", touchFile.getAbsolutePath());
+
+ assertEquals(0, ret);
+ FileStatus fileStatus =
+ FileContext.getLocalFSFileContext().getFileStatus(
+ new Path(touchFile.getAbsolutePath()));
+ assertEquals(expectedRunAsUser, fileStatus.getOwner());
+ cleanupAppFiles(expectedRunAsUser);
+ }
+
+ @Test
+ public void testNonSecureRunAsSubmitter() throws Exception {
+ Assume.assumeTrue(shouldRun());
+ Assume.assumeFalse(UserGroupInformation.isSecurityEnabled());
+ String expectedRunAsUser = appSubmitter;
+ conf.set(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, "false");
+ exec.setConf(conf);
File touchFile = new File(workSpace, "touch-file");
int ret = runAndBlock("touch", touchFile.getAbsolutePath());
-
+
assertEquals(0, ret);
- FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
+ FileStatus fileStatus =
+ FileContext.getLocalFSFileContext().getFileStatus(
new Path(touchFile.getAbsolutePath()));
- assertEquals(appSubmitter, fileStatus.getOwner());
+ assertEquals(expectedRunAsUser, fileStatus.getOwner());
+ cleanupAppFiles(expectedRunAsUser);
+ // reset conf
+ conf.unset(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS);
+ exec.setConf(conf);
}
@Test
public void testContainerKill() throws Exception {
- if (!shouldRun()) {
- return;
- }
-
- final ContainerId sleepId = getNextContainerId();
+ Assume.assumeTrue(shouldRun());
+
+ final ContainerId sleepId = getNextContainerId();
Thread t = new Thread() {
public void run() {
try {
runAndBlock(sleepId, "sleep", "100");
} catch (IOException e) {
- LOG.warn("Caught exception while running sleep",e);
+ LOG.warn("Caught exception while running sleep", e);
}
};
};
- t.setDaemon(true); //If it does not exit we shouldn't block the test.
+ t.setDaemon(true); // If it does not exit we shouldn't block the test.
t.start();
assertTrue(t.isAlive());
-
+
String pid = null;
int count = 10;
while ((pid = exec.getProcessId(sleepId)) == null && count > 0) {
@@ -328,40 +432,77 @@ public class TestLinuxContainerExecutor {
exec.signalContainer(appSubmitter, pid, Signal.TERM);
LOG.info("sleeping for 100ms to let the sleep be killed");
Thread.sleep(100);
-
+
assertFalse(t.isAlive());
+ cleanupAppFiles(appSubmitter);
+ }
+
+ @Test
+ public void testCGroups() throws Exception {
+ Assume.assumeTrue(shouldRun());
+ String cgroupsMount = System.getProperty("cgroups.mount");
+ Assume.assumeTrue((cgroupsMount != null) && !cgroupsMount.isEmpty());
+
+ assertTrue("Cgroups mount point does not exist", new File(
+ cgroupsMount).exists());
+ List<String> cgroupKVs = new ArrayList<>();
+
+ String hierarchy = "hadoop-yarn";
+ String[] controllers = { "cpu", "net_cls" };
+ for (String controller : controllers) {
+ cgroupKVs.add(controller + "=" + cgroupsMount + "/" + controller);
+ assertTrue(new File(cgroupsMount, controller).exists());
+ }
+
+ try {
+ exec.mountCgroups(cgroupKVs, hierarchy);
+ for (String controller : controllers) {
+ assertTrue(controller + " cgroup not mounted", new File(
+ cgroupsMount + "/" + controller + "/tasks").exists());
+ assertTrue(controller + " cgroup hierarchy not created",
+ new File(cgroupsMount + "/" + controller + "/" + hierarchy).exists());
+ assertTrue(controller + " cgroup hierarchy created incorrectly",
+ new File(cgroupsMount + "/" + controller + "/" + hierarchy
+ + "/tasks").exists());
+ }
+ } catch (IOException ie) {
+ fail("Couldn't mount cgroups " + ie.toString());
+ throw ie;
+ }
}
@Test
public void testLocalUser() throws Exception {
+ Assume.assumeTrue(shouldRun());
try {
- //nonsecure default
+ // nonsecure default
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "simple");
+ "simple");
UserGroupInformation.setConfiguration(conf);
LinuxContainerExecutor lce = new LinuxContainerExecutor();
lce.setConf(conf);
- Assert.assertEquals(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
+ Assert.assertEquals(
+ YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
lce.getRunAsUser("foo"));
- //nonsecure custom setting
+ // nonsecure custom setting
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "bar");
lce = new LinuxContainerExecutor();
lce.setConf(conf);
Assert.assertEquals("bar", lce.getRunAsUser("foo"));
- //nonsecure without limits
+ // nonsecure without limits
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "bar");
conf.setBoolean(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, false);
lce = new LinuxContainerExecutor();
lce.setConf(conf);
Assert.assertEquals("foo", lce.getRunAsUser("foo"));
- //secure
+ // secure
conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "kerberos");
+ "kerberos");
UserGroupInformation.setConfiguration(conf);
lce = new LinuxContainerExecutor();
lce.setConf(conf);
@@ -369,49 +510,50 @@ public class TestLinuxContainerExecutor {
} finally {
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "simple");
+ "simple");
UserGroupInformation.setConfiguration(conf);
}
}
@Test
public void testNonsecureUsernamePattern() throws Exception {
+ Assume.assumeTrue(shouldRun());
try {
- //nonsecure default
+ // nonsecure default
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "simple");
+ "simple");
UserGroupInformation.setConfiguration(conf);
LinuxContainerExecutor lce = new LinuxContainerExecutor();
lce.setConf(conf);
lce.verifyUsernamePattern("foo");
try {
lce.verifyUsernamePattern("foo/x");
- Assert.fail();
+ fail();
} catch (IllegalArgumentException ex) {
- //NOP
+ // NOP
} catch (Throwable ex) {
- Assert.fail(ex.toString());
+ fail(ex.toString());
}
-
- //nonsecure custom setting
+
+ // nonsecure custom setting
conf.set(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY, "foo");
lce = new LinuxContainerExecutor();
lce.setConf(conf);
lce.verifyUsernamePattern("foo");
try {
lce.verifyUsernamePattern("bar");
- Assert.fail();
+ fail();
} catch (IllegalArgumentException ex) {
- //NOP
+ // NOP
} catch (Throwable ex) {
- Assert.fail(ex.toString());
+ fail(ex.toString());
}
- //secure, pattern matching does not kick in.
+ // secure, pattern matching does not kick in.
conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "kerberos");
+ "kerberos");
UserGroupInformation.setConfiguration(conf);
lce = new LinuxContainerExecutor();
lce.setConf(conf);
@@ -420,13 +562,14 @@ public class TestLinuxContainerExecutor {
} finally {
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
- "simple");
+ "simple");
UserGroupInformation.setConfiguration(conf);
}
}
- @Test(timeout=10000)
+ @Test(timeout = 10000)
public void testPostExecuteAfterReacquisition() throws Exception {
+ Assume.assumeTrue(shouldRun());
// make up some bogus container ID
ApplicationId appId = ApplicationId.newInstance(12345, 67890);
ApplicationAttemptId attemptId =
@@ -435,7 +578,7 @@ public class TestLinuxContainerExecutor {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
- TestResourceHandler.class, LCEResourcesHandler.class);
+ TestResourceHandler.class, LCEResourcesHandler.class);
LinuxContainerExecutor lce = new LinuxContainerExecutor();
lce.setConf(conf);
try {
@@ -444,7 +587,7 @@ public class TestLinuxContainerExecutor {
// expected if LCE isn't setup right, but not necessary for this test
}
lce.reacquireContainer("foouser", cid);
- Assert.assertTrue("postExec not called after reacquisition",
+ assertTrue("postExec not called after reacquisition",
TestResourceHandler.postExecContainers.contains(cid));
}