You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:43:35 UTC
svn commit: r1077679 [2/6] - in
/hadoop/common/branches/branch-0.20-security-patches: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/core/org/apache/hadoop/fs/ src/core/org/apa...
Added: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1077679&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/test/test-task-controller.c (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/test/test-task-controller.c Fri Mar 4 04:43:33 2011
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "configuration.h"
+#include "task-controller.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#define TEST_ROOT "/tmp/test-task-controller"
+#define DONT_TOUCH_FILE "dont-touch-me"
+
+static char* username = NULL;
+
+/**
+ * Run the command using the effective user id.
+ * It can't use system, since bash seems to copy the real user id into the
+ * effective id.
+ */
+void run(const char *cmd) {
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: failed to fork - %s\n", strerror(errno));
+ } else if (child == 0) {
+ char *cmd_copy = strdup(cmd);
+ char *ptr;
+ int words = 1;
+ for(ptr = strchr(cmd_copy, ' '); ptr; ptr = strchr(ptr+1, ' ')) {
+ words += 1;
+ }
+ char **argv = malloc(sizeof(char *) * (words + 1));
+ ptr = strtok(cmd_copy, " ");
+ int i = 0;
+ argv[i++] = ptr;
+ while (ptr != NULL) {
+ ptr = strtok(NULL, " ");
+ argv[i++] = ptr;
+ }
+ if (execvp(argv[0], argv) != 0) {
+ printf("FAIL: exec failed in child %s - %s\n", cmd, strerror(errno));
+ exit(42);
+ }
+ } else {
+ int status = 0;
+ if (waitpid(child, &status, 0) <= 0) {
+ printf("FAIL: failed waiting for child process %s pid %d - %s\n",
+ cmd, child, strerror(errno));
+ exit(1);
+ }
+ if (!WIFEXITED(status)) {
+ printf("FAIL: process %s pid %d did not exit\n", cmd, child);
+ exit(1);
+ }
+ if (WEXITSTATUS(status) != 0) {
+ printf("FAIL: process %s pid %d exited with error status %d\n", cmd,
+ child, WEXITSTATUS(status));
+ exit(1);
+ }
+ }
+}
+
+int write_config_file(char *file_name) {
+ FILE *file;
+ file = fopen(file_name, "w");
+ if (file == NULL) {
+ printf("Failed to open %s.\n", file_name);
+ return EXIT_FAILURE;
+ }
+ fprintf(file, "mapred.local.dir=" TEST_ROOT "/local-1");
+ int i;
+ for(i=2; i < 5; ++i) {
+ fprintf(file, "," TEST_ROOT "/local-%d", i);
+ }
+ fprintf(file, "\n");
+ fprintf(file, "hadoop.log.dir=" TEST_ROOT "/logs\n");
+ fclose(file);
+ return 0;
+}
+
+void create_tt_roots() {
+ char** tt_roots = get_values("mapred.local.dir");
+ char** tt_root;
+ for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) {
+ if (mkdir(*tt_root, 0755) != 0) {
+ printf("FAIL: Can't create directory %s - %s\n", *tt_root,
+ strerror(errno));
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/taskTracker", *tt_root);
+ if (mkdir(buffer, 0755) != 0) {
+ printf("FAIL: Can't create directory %s - %s\n", buffer,
+ strerror(errno));
+ exit(1);
+ }
+ }
+ free_values(tt_roots);
+}
+
+void test_get_user_directory() {
+ char *user_dir = get_user_directory("/tmp", "user");
+ char *expected = "/tmp/taskTracker/user";
+ if (strcmp(user_dir, expected) != 0) {
+ printf("test_get_user_directory expected %s got %s\n", user_dir, expected);
+ exit(1);
+ }
+ free(user_dir);
+}
+
+void test_get_job_directory() {
+ char *expected = "/tmp/taskTracker/user/jobcache/job_200906101234_0001";
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ if (strcmp(job_dir, expected) != 0) {
+ exit(1);
+ }
+ free(job_dir);
+}
+
+void test_get_attempt_directory() {
+ char *attempt_dir = get_attempt_work_directory("/tmp", "owen", "job_1",
+ "attempt_1");
+ char *expected = "/tmp/taskTracker/owen/jobcache/job_1/attempt_1/work";
+ if (strcmp(attempt_dir, expected) != 0) {
+ printf("Fail get_attempt_work_directory got %s expected %s\n",
+ attempt_dir, expected);
+ }
+ free(attempt_dir);
+}
+
+void test_get_task_launcher_file() {
+ char *expected_file = ("/tmp/taskTracker/user/jobcache/job_200906101234_0001"
+ "/taskjvm.sh");
+ char *job_dir = get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ char *task_file = get_task_launcher_file(job_dir);
+ if (strcmp(task_file, expected_file) != 0) {
+ printf("failure to match expected task file %s vs %s\n", task_file,
+ expected_file);
+ exit(1);
+ }
+ free(job_dir);
+ free(task_file);
+}
+
+void test_get_job_log_dir() {
+ char *expected = TEST_ROOT "/logs/userlogs/job_200906101234_0001";
+ char *logdir = get_job_log_directory("job_200906101234_0001");
+ if (strcmp(logdir, expected) != 0) {
+ printf("Fail get_job_log_dir got %s expected %s\n", logdir, expected);
+ exit(1);
+ }
+ free(logdir);
+}
+
+void test_get_task_log_dir() {
+ char *logdir = get_job_log_directory("job_5/task_4");
+ char *expected = TEST_ROOT "/logs/userlogs/job_5/task_4";
+ if (strcmp(logdir, expected) != 0) {
+ printf("FAIL: get_task_log_dir expected %s got %s\n", logdir, expected);
+ }
+ free(logdir);
+}
+
+void test_check_user() {
+ printf("\nTesting test_check_user\n");
+ struct passwd *user = check_user(username);
+ if (user == NULL) {
+ printf("FAIL: failed check for user %s\n", username);
+ exit(1);
+ }
+ free(user);
+ if (check_user("lp") != NULL) {
+ printf("FAIL: failed check for system user lp\n");
+ exit(1);
+ }
+ if (check_user("root") != NULL) {
+ printf("FAIL: failed check for system user root\n");
+ exit(1);
+ }
+ if (check_user("mapred") != NULL) {
+ printf("FAIL: failed check for hadoop user mapred\n");
+ exit(1);
+ }
+}
+
+void test_check_configuration_permissions() {
+ printf("\nTesting check_configuration_permissions\n");
+ if (check_configuration_permissions("/etc/passwd") != 0) {
+ printf("FAIL: failed permission check on /etc/passwd\n");
+ exit(1);
+ }
+ if (check_configuration_permissions(TEST_ROOT) == 0) {
+ printf("FAIL: failed permission check on %s\n", TEST_ROOT);
+ exit(1);
+ }
+}
+
+void test_delete_task() {
+ if (initialize_user(username)) {
+ printf("FAIL: failed to initialized user %s\n", username);
+ exit(1);
+ }
+ char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
+ char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username,
+ DONT_TOUCH_FILE);
+ char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2",
+ username, "job_1", "task_1");
+ char buffer[100000];
+ sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+ run(buffer);
+ sprintf(buffer, "touch %s", dont_touch);
+ run(buffer);
+
+ // soft link to the canary file from the task directory
+ sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+ run(buffer);
+ // hard link to the canary file from the task directory
+ sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+ run(buffer);
+ // create a dot file in the task directory
+ sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+ run(buffer);
+ // create a no permission file
+ sprintf(buffer, "touch %s/who/let/protect", task_dir);
+ run(buffer);
+ sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+ run(buffer);
+ // create a no permission directory
+ sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+ run(buffer);
+
+ // delete task directory
+ int ret = delete_as_user(username, "jobcache/job_1/task_1");
+ if (ret != 0) {
+ printf("FAIL: return code from delete_as_user is %d\n", ret);
+ exit(1);
+ }
+
+ // check to make sure the task directory is gone
+ if (access(task_dir, R_OK) == 0) {
+ printf("FAIL: failed to delete the directory - %s\n", task_dir);
+ exit(1);
+ }
+ // check to make sure the job directory is not gone
+ if (access(job_dir, R_OK) != 0) {
+ printf("FAIL: accidently deleted the directory - %s\n", job_dir);
+ exit(1);
+ }
+ // but that the canary is not gone
+ if (access(dont_touch, R_OK) != 0) {
+ printf("FAIL: accidently deleted file %s\n", dont_touch);
+ exit(1);
+ }
+ sprintf(buffer, "chmod -R 700 %s", job_dir);
+ run(buffer);
+ sprintf(buffer, "rm -fr %s", job_dir);
+ run(buffer);
+ free(job_dir);
+ free(task_dir);
+ free(dont_touch);
+}
+
+void test_delete_job() {
+ char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2");
+ char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username,
+ DONT_TOUCH_FILE);
+ char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2",
+ username, "job_2", "task_1");
+ char buffer[100000];
+ sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+ run(buffer);
+ sprintf(buffer, "touch %s", dont_touch);
+ run(buffer);
+
+ // soft link to the canary file from the task directory
+ sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+ run(buffer);
+ // hard link to the canary file from the task directory
+ sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+ run(buffer);
+ // create a dot file in the task directory
+ sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+ run(buffer);
+ // create a no permission file
+ sprintf(buffer, "touch %s/who/let/protect", task_dir);
+ run(buffer);
+ sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+ run(buffer);
+ // create a no permission directory
+ sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+ run(buffer);
+
+ // delete task directory
+ int ret = delete_as_user(username, "jobcache/job_2");
+ if (ret != 0) {
+ printf("FAIL: return code from delete_as_user is %d\n", ret);
+ exit(1);
+ }
+
+ // check to make sure the task directory is gone
+ if (access(task_dir, R_OK) == 0) {
+ printf("FAIL: failed to delete the directory - %s\n", task_dir);
+ exit(1);
+ }
+ // check to make sure the job directory is gone
+ if (access(job_dir, R_OK) == 0) {
+ printf("FAIL: didn't delete the directory - %s\n", job_dir);
+ exit(1);
+ }
+ // but that the canary is not gone
+ if (access(dont_touch, R_OK) != 0) {
+ printf("FAIL: accidently deleted file %s\n", dont_touch);
+ exit(1);
+ }
+ free(job_dir);
+ free(task_dir);
+ free(dont_touch);
+}
+
+
+void test_delete_user() {
+ printf("\nTesting delete_user\n");
+ char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3");
+ if (mkdirs(job_dir, 0700) != 0) {
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/local-1/taskTracker/%s", TEST_ROOT, username);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: directory missing before test\n");
+ exit(1);
+ }
+ if (delete_as_user(username, "") != 0) {
+ exit(1);
+ }
+ if (access(buffer, R_OK) == 0) {
+ printf("FAIL: directory not deleted\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/local-1", R_OK) != 0) {
+ printf("FAIL: local-1 directory does not exist\n");
+ exit(1);
+ }
+ free(job_dir);
+}
+
+void test_delete_log_directory() {
+ printf("\nTesting delete_log_directory\n");
+ char *job_log_dir = get_job_log_directory("job_1");
+ if (job_log_dir == NULL) {
+ exit(1);
+ }
+ if (create_directory_for_user(job_log_dir) != 0) {
+ exit(1);
+ }
+ free(job_log_dir);
+ char *task_log_dir = get_job_log_directory("job_1/task_2");
+ if (task_log_dir == NULL) {
+ exit(1);
+ }
+ if (mkdirs(task_log_dir, 0700) != 0) {
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) != 0) {
+ printf("FAIL: can't access task directory - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (delete_log_directory("job_1/task_2") != 0) {
+ printf("FAIL: can't delete task directory\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) == 0) {
+ printf("FAIL: task directory not deleted\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) != 0) {
+ printf("FAIL: job directory not deleted - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (delete_log_directory("job_1") != 0) {
+ printf("FAIL: can't delete task directory\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) == 0) {
+ printf("FAIL: job directory not deleted\n");
+ exit(1);
+ }
+ free(task_log_dir);
+}
+
+void run_test_in_child(const char* test_name, void (*func)()) {
+ printf("\nRunning test %s in child process\n", test_name);
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ func();
+ exit(0);
+ } else {
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
+ exit(1);
+ }
+ if (!WIFEXITED(status)) {
+ printf("FAIL: child %d didn't exit - %d\n", child, status);
+ exit(1);
+ }
+ if (WEXITSTATUS(status) != 0) {
+ printf("FAIL: child %d exited with bad status %d\n",
+ child, WEXITSTATUS(status));
+ exit(1);
+ }
+ }
+}
+
+void test_signal_task() {
+ printf("\nTesting signal_task\n");
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ exit(1);
+ }
+ sleep(3600);
+ exit(0);
+ } else {
+ printf("Child task launched as %d\n", child);
+ if (signal_user_task(username, child, SIGQUIT) != 0) {
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (!WIFSIGNALED(status)) {
+ printf("FAIL: child wasn't signalled - %d\n", status);
+ exit(1);
+ }
+ if (WTERMSIG(status) != SIGQUIT) {
+ printf("FAIL: child was killed with %d instead of %d\n",
+ WTERMSIG(status), SIGQUIT);
+ exit(1);
+ }
+ }
+}
+
+void test_signal_task_group() {
+ printf("\nTesting group signal_task\n");
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ setpgrp();
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ exit(1);
+ }
+ sleep(3600);
+ exit(0);
+ }
+ printf("Child task launched as %d\n", child);
+ if (signal_user_task(username, child, SIGKILL) != 0) {
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (!WIFSIGNALED(status)) {
+ printf("FAIL: child wasn't signalled - %d\n", status);
+ exit(1);
+ }
+ if (WTERMSIG(status) != SIGKILL) {
+ printf("FAIL: child was killed with %d instead of %d\n",
+ WTERMSIG(status), SIGKILL);
+ exit(1);
+ }
+}
+
+void test_init_job() {
+ printf("\nTesting init job\n");
+ if (seteuid(0) != 0) {
+ printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ FILE* creds = fopen(TEST_ROOT "/creds.txt", "w");
+ if (creds == NULL) {
+ printf("FAIL: failed to create credentials file - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fprintf(creds, "secret key\n") < 0) {
+ printf("FAIL: fprintf failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fclose(creds) != 0) {
+ printf("FAIL: fclose failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ FILE* job_xml = fopen(TEST_ROOT "/job.xml", "w");
+ if (job_xml == NULL) {
+ printf("FAIL: failed to create job file - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fprintf(job_xml, "<jobconf/>\n") < 0) {
+ printf("FAIL: fprintf failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fclose(job_xml) != 0) {
+ printf("FAIL: fclose failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (seteuid(user_detail->pw_uid) != 0) {
+ printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+ exit(1);
+ }
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: failed to fork process for init_job - %s\n",
+ strerror(errno));
+ exit(1);
+ } else if (child == 0) {
+ char *final_pgm[] = {"touch", "my-touch-file", 0};
+ if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt",
+ TEST_ROOT "/job.xml", final_pgm) != 0) {
+ printf("FAIL: failed in child\n");
+ exit(42);
+ }
+ // should never return
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) <= 0) {
+ printf("FAIL: failed waiting for process %d - %s\n", child,
+ strerror(errno));
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_4", R_OK) != 0) {
+ printf("FAIL: failed to create job log directory\n");
+ exit(1);
+ }
+ char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_4");
+ if (access(job_dir, R_OK) != 0) {
+ printf("FAIL: failed to create job directory %s\n", job_dir);
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/jobToken", job_dir);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: failed to create credentials %s\n", buffer);
+ exit(1);
+ }
+ sprintf(buffer, "%s/my-touch-file", job_dir);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: failed to create touch file %s\n", buffer);
+ exit(1);
+ }
+ free(job_dir);
+ job_dir = get_job_log_directory("job_4");
+ if (access(job_dir, R_OK) != 0) {
+ printf("FAIL: failed to create job log directory %s\n", job_dir);
+ exit(1);
+ }
+ free(job_dir);
+}
+
+void test_run_task() {
+ printf("\nTesting run task\n");
+ if (seteuid(0) != 0) {
+ printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ const char* script_name = TEST_ROOT "/task-script";
+ FILE* script = fopen(script_name, "w");
+ if (script == NULL) {
+ printf("FAIL: failed to create script file - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (seteuid(user_detail->pw_uid) != 0) {
+ printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fprintf(script, "#!/bin/bash\n"
+ "touch foobar\n"
+ "exit 0") < 0) {
+ printf("FAIL: fprintf failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fclose(script) != 0) {
+ printf("FAIL: fclose failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ fflush(stdout);
+ fflush(stderr);
+ char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1",
+ username, "job_4", "task_1");
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: failed to fork process for init_job - %s\n",
+ strerror(errno));
+ exit(1);
+ } else if (child == 0) {
+ if (run_task_as_user(username, "job_4", "task_1",
+ task_dir, script_name) != 0) {
+ printf("FAIL: failed in child\n");
+ exit(42);
+ }
+ // should never return
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) <= 0) {
+ printf("FAIL: failed waiting for process %d - %s\n", child,
+ strerror(errno));
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_4/task_1", R_OK) != 0) {
+ printf("FAIL: failed to create task log directory\n");
+ exit(1);
+ }
+ if (access(task_dir, R_OK) != 0) {
+ printf("FAIL: failed to create task directory %s\n", task_dir);
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/foobar", task_dir);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: failed to create touch file %s\n", buffer);
+ exit(1);
+ }
+ free(task_dir);
+ task_dir = get_job_log_directory("job_4/task_1");
+ if (access(task_dir, R_OK) != 0) {
+ printf("FAIL: failed to create job log directory %s\n", task_dir);
+ exit(1);
+ }
+ free(task_dir);
+}
+
+int main(int argc, char **argv) {
+ LOGFILE = stdout;
+ int my_username = 0;
+
+ // clean up any junk from previous run
+ system("chmod -R u=rwx " TEST_ROOT "; rm -fr " TEST_ROOT);
+
+ if (mkdirs(TEST_ROOT "/logs/userlogs", 0755) != 0) {
+ exit(1);
+ }
+
+ if (write_config_file(TEST_ROOT "/test.cfg") != 0) {
+ exit(1);
+ }
+ read_config(TEST_ROOT "/test.cfg");
+
+ create_tt_roots();
+
+ if (getuid() == 0 && argc == 2) {
+ username = argv[1];
+ } else {
+ username = strdup(getpwuid(getuid())->pw_name);
+ my_username = 1;
+ }
+ set_tasktracker_uid(geteuid(), getegid());
+
+ if (set_user(username)) {
+ exit(1);
+ }
+
+ printf("\nStarting tests\n");
+
+ printf("\nTesting get_user_directory()\n");
+ test_get_user_directory();
+
+ printf("\nTesting get_job_directory()\n");
+ test_get_job_directory();
+
+ printf("\nTesting get_attempt_directory()\n");
+ test_get_attempt_directory();
+
+ printf("\nTesting get_task_launcher_file()\n");
+ test_get_task_launcher_file();
+
+ printf("\nTesting get_job_log_dir()\n");
+ test_get_job_log_dir();
+
+ test_check_configuration_permissions();
+
+ printf("\nTesting get_task_log_dir()\n");
+ test_get_task_log_dir();
+
+ printf("\nTesting delete_task()\n");
+ test_delete_task();
+
+ printf("\nTesting delete_job()\n");
+ test_delete_job();
+
+ test_delete_user();
+
+ test_check_user();
+
+ test_delete_log_directory();
+
+ // the tests that change user need to be run in a subshell, so that
+ // when they change user they don't give up our privs
+ run_test_in_child("test_signal_task", test_signal_task);
+ run_test_in_child("test_signal_task_group", test_signal_task_group);
+
+ // init job and run task can't be run if you aren't testing as root
+ if (getuid() == 0) {
+ // these tests do internal forks so that the change_owner and execs
+ // don't mess up our process.
+ test_init_job();
+ test_run_task();
+ }
+
+ seteuid(0);
+ run("rm -fr " TEST_ROOT);
+ printf("\nFinished tests\n");
+
+ if (my_username) {
+ free(username);
+ }
+ free_configurations();
+ return 0;
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/LocalDirAllocator.java Fri Mar 4 04:43:33 2011
@@ -142,6 +142,23 @@ public class LocalDirAllocator {
return context.getLocalPathToRead(pathStr, conf);
}
+ /**
+ * Get all of the paths that currently exist in the working directories.
+ * @param pathStr the path underneath the roots
+ * @param conf the configuration to look up the roots in
+ * @return all of the paths that exist under any of the roots
+ * @throws IOException
+ */
+ public Iterable<Path> getAllLocalPathsToRead(String pathStr,
+ Configuration conf
+ ) throws IOException {
+ AllocatorPerContext context;
+ synchronized (this) {
+ context = obtainContext(contextCfgItemName);
+ }
+ return context.getAllLocalPathsToRead(pathStr, conf);
+ }
+
/** Creates a temporary file in the local FS. Pass size as -1 if not known
* apriori. We round-robin over the set of disks (via the configured dirs)
* and select the first complete path which has enough space. A file is
@@ -210,7 +227,8 @@ public class LocalDirAllocator {
/** This method gets called everytime before any read/write to make sure
* that any change to localDirs is reflected immediately.
*/
- private void confChanged(Configuration conf) throws IOException {
+ private synchronized void confChanged(Configuration conf
+ ) throws IOException {
String newLocalDirs = conf.get(contextCfgItemName);
if (!newLocalDirs.equals(savedLocalDirs)) {
localDirs = conf.getStrings(contextCfgItemName);
@@ -270,17 +288,6 @@ public class LocalDirAllocator {
return dirNumLastAccessed;
}
- /** Get a path from the local FS. This method should be used if the size of
- * the file is not known a priori.
- *
- * It will use roulette selection, picking directories
- * with probability proportional to their available space.
- */
- public synchronized Path getLocalPathForWrite(String path,
- Configuration conf) throws IOException {
- return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
- }
-
/** Get a path from the local FS. If size is known, we go
* round-robin over the set of disks (via the configured dirs) and return
* the first complete path which has enough space.
@@ -395,6 +402,76 @@ public class LocalDirAllocator {
" the configured local directories");
}
+ private static
+ class PathIterator implements Iterator<Path>, Iterable<Path> {
+ private final FileSystem fs;
+ private final String pathStr;
+ private int i = 0;
+ private final String[] rootDirs;
+ private Path next = null;
+
+ private PathIterator(FileSystem fs, String pathStr, String[] rootDirs
+ ) throws IOException {
+ this.fs = fs;
+ this.pathStr = pathStr;
+ this.rootDirs = rootDirs;
+ advance();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ private void advance() throws IOException {
+ while (i < rootDirs.length) {
+ next = new Path(rootDirs[i++], pathStr);
+ if (fs.exists(next)) {
+ return;
+ }
+ }
+ next = null;
+ }
+
+ @Override
+ public Path next() {
+ Path result = next;
+ try {
+ advance();
+ } catch (IOException ie) {
+ throw new RuntimeException("Can't check existance of " + next, ie);
+ }
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("read only iterator");
+ }
+
+ @Override
+ public Iterator<Path> iterator() {
+ return this;
+ }
+ }
+
+ /**
+ * Get all of the paths that currently exist in the working directories.
+ * @param pathStr the path underneath the roots
+ * @param conf the configuration to look up the roots in
+ * @return all of the paths that exist under any of the roots
+ * @throws IOException
+ */
+ synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
+ Configuration conf
+ ) throws IOException {
+ confChanged(conf);
+ if (pathStr.startsWith("/")) {
+ pathStr = pathStr.substring(1);
+ }
+ return new PathIterator(localFS, pathStr, localDirs);
+ }
+
/** We search through all the configured dirs for the file's existence
* and return true when we find one
*/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcessTree.java Fri Mar 4 04:43:33 2011
@@ -33,8 +33,20 @@ import org.apache.hadoop.util.Shell.Shel
public class ProcessTree {
private static final Log LOG = LogFactory.getLog(ProcessTree.class);
-
- public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+
+ /**
+ * The constants for the signals.
+ */
+ public static enum Signal {
+ QUIT(3), KILL(9), TERM(15);
+ private int value;
+ private Signal(int value) {
+ this.value = value;
+ }
+ public int getValue() {
+ return value;
+ }
+ }
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
@@ -49,95 +61,8 @@ public class ProcessTree {
setsidSupported = false;
} finally { // handle the exit code
LOG.info("setsid exited with exit code " + shexec.getExitCode());
- return setsidSupported;
- }
- }
-
- /**
- * Destroy the process-tree.
- * @param pid process id of the root process of the subtree of processes
- * to be killed
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param isProcessGroup pid is a process group leader or not
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
- */
- public static void destroy(String pid, long sleeptimeBeforeSigkill,
- boolean isProcessGroup, boolean inBackground) {
- if(isProcessGroup) {
- destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
- }
- else {
- //TODO: Destroy all the processes in the subtree in this case also.
- // For the time being, killing only the root process.
- destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
- }
- }
-
- /** Destroy the process.
- * @param pid Process id of to-be-killed-process
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
- */
- protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
- boolean inBackground) {
- terminateProcess(pid);
- sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
- }
-
- /** Destroy the process group.
- * @param pgrpId Process group id of to-be-killed-processes
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process group is to be killed in the back ground with
- * a separate thread
- */
- protected static void destroyProcessGroup(String pgrpId,
- long sleeptimeBeforeSigkill, boolean inBackground) {
- terminateProcessGroup(pgrpId);
- sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
- }
-
- /**
- * Sends terminate signal to the process, allowing it to gracefully exit.
- *
- * @param pid pid of the process to be sent SIGTERM
- */
- public static void terminateProcess(String pid) {
- ShellCommandExecutor shexec = null;
- try {
- String[] args = { "kill", pid };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- LOG.info("Killing process " + pid +
- " with SIGTERM. Exit code " + shexec.getExitCode());
- }
- }
-
- /**
- * Sends terminate signal to all the process belonging to the passed process
- * group, allowing the group to gracefully exit.
- *
- * @param pgrpId process group id
- */
- public static void terminateProcessGroup(String pgrpId) {
- ShellCommandExecutor shexec = null;
- try {
- String[] args = { "kill", "--", "-" + pgrpId };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- LOG.info("Killing all processes in the process group " + pgrpId +
- " with SIGTERM. Exit code " + shexec.getExitCode());
}
+ return setsidSupported;
}
/**
@@ -160,83 +85,63 @@ public class ProcessTree {
LOG.warn("Thread sleep is interrupted.");
}
if(isProcessGroup) {
- killProcessGroup(pid);
+ killProcessGroup(pid, Signal.KILL);
} else {
- killProcess(pid);
+ killProcess(pid, Signal.KILL);
}
}
}
-
- /** Kills the process(OR process group) by sending the signal SIGKILL
- * @param pid Process id(OR process group id) of to-be-deleted-process
- * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
- * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
- */
- private static void sigKill(String pid, boolean isProcessGroup,
- long sleeptimeBeforeSigkill, boolean inBackground) {
-
- if(inBackground) { // use a separate thread for killing
- SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
- sleeptimeBeforeSigkill);
- sigKillThread.setDaemon(true);
- sigKillThread.start();
- }
- else {
- sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
- }
- }
-
/**
- * Sends kill signal to process, forcefully terminating the process.
+ * Sends signal to process, forcefully terminating the process.
*
* @param pid process id
+ * @param signal the signal number to send
*/
- public static void killProcess(String pid) {
+ public static void killProcess(String pid, Signal signal) {
//If process tree is not alive then return immediately.
if(!ProcessTree.isAlive(pid)) {
return;
}
- String[] args = { "kill", "-9", pid };
+ String[] args = { "kill", "-" + signal.getValue(), pid };
ShellCommandExecutor shexec = new ShellCommandExecutor(args);
try {
shexec.execute();
} catch (IOException e) {
- LOG.warn("Error sending SIGKILL to process "+ pid + " ."+
+ LOG.warn("Error sending signal " + signal + " to process "+ pid + " ."+
StringUtils.stringifyException(e));
} finally {
- LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
+ LOG.info("Killing process " + pid + " with signal " + signal +
+ ". Exit code " + shexec.getExitCode());
}
}
/**
- * Sends kill signal to all process belonging to same process group,
+ * Sends signal to all process belonging to same process group,
* forcefully terminating the process group.
*
* @param pgrpId process group id
+ * @param signal the signal number to send
*/
- public static void killProcessGroup(String pgrpId) {
+ public static void killProcessGroup(String pgrpId, Signal signal) {
//If process tree is not alive then return immediately.
if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
return;
}
- String[] args = { "kill", "-9", "-"+pgrpId };
+ String[] args = { "kill", "-" + signal.getValue() , "-"+pgrpId };
ShellCommandExecutor shexec = new ShellCommandExecutor(args);
try {
shexec.execute();
} catch (IOException e) {
- LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+
+ LOG.warn("Error sending signal " + signal + " to process group "+
+ pgrpId + " ."+
StringUtils.stringifyException(e));
} finally {
- LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
- + shexec.getExitCode());
+ LOG.info("Killing process group" + pgrpId + " with signal " + signal +
+ ". Exit code " + shexec.getExitCode());
}
}
@@ -297,7 +202,7 @@ public class ProcessTree {
private String pid = null;
private boolean isProcessGroup = false;
- private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
+ private final long sleepTimeBeforeSigKill;
private SigKillThread(String pid, boolean isProcessGroup, long interval) {
this.pid = pid;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Fri Mar 4 04:43:33 2011
@@ -29,15 +29,11 @@ import java.util.Map;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.Arrays;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.Shell.ExitCodeException;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-
/**
* A Proc file-system based ProcessTree. Works only on Linux.
*/
@@ -47,8 +43,6 @@ public class ProcfsBasedProcessTree exte
.getLog(ProcfsBasedProcessTree.class);
private static final String PROCFS = "/proc/";
- public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
- private long sleepTimeBeforeSigKill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
.compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
@@ -61,21 +55,14 @@ public class ProcfsBasedProcessTree exte
private Integer pid = -1;
- private boolean setsidUsed = false;
-
- private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
-
private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) {
- this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ this(pid, false);
}
- public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
- long sigkillInterval) {
+ public ProcfsBasedProcessTree(String pid, boolean setsidUsed) {
this(pid,PROCFS);
- this.setsidUsed = setsidUsed;
- sleeptimeBeforeSigkill = sigkillInterval;
}
public ProcfsBasedProcessTree(String pid, String procfsDir) {
@@ -84,17 +71,6 @@ public class ProcfsBasedProcessTree exte
}
/**
- * Sets SIGKILL interval
- * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
- * String, boolean, long)} instead
- * @param interval The time to wait before sending SIGKILL
- * after sending SIGTERM
- */
- public void setSigKillInterval(long interval) {
- sleepTimeBeforeSigKill = interval;
- }
-
- /**
* Checks if the ProcfsBasedProcessTree is available on this system.
*
* @return true if ProcfsBasedProcessTree is available. False otherwise.
@@ -218,81 +194,6 @@ public class ProcfsBasedProcessTree exte
return false;
}
- /** Verify that the given process id is same as its process group id.
- * @param pidStr Process id of the to-be-verified-process
- */
- private static boolean assertPidPgrpidForMatch(String pidStr) {
- Integer pId = Integer.parseInt(pidStr);
- // Get information for this process
- ProcessInfo pInfo = new ProcessInfo(pId);
- pInfo = constructProcessInfo(pInfo);
- //make sure that pId and its pgrpId match
- if (!pInfo.getPgrpId().equals(pId)) {
- LOG.warn("Unexpected: Process with PID " + pId +
- " is not a process group leader.");
- return false;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(pId + " is a process group leader, as expected.");
- }
- return true;
- }
-
- /** Make sure that the given pid is a process group leader and then
- * destroy the process group.
- * @param pgrpId Process group id of to-be-killed-processes
- * @param interval The time to wait before sending SIGKILL
- * after sending SIGTERM
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
- */
- public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
- boolean inBackground)
- throws IOException {
- // Make sure that the pid given is a process group leader
- if (!assertPidPgrpidForMatch(pgrpId)) {
- throw new IOException("Process with PID " + pgrpId +
- " is not a process group leader.");
- }
- destroyProcessGroup(pgrpId, interval, inBackground);
- }
-
- /**
- * Destroy the process-tree.
- */
- public void destroy() {
- destroy(true);
- }
-
- /**
- * Destroy the process-tree.
- * @param inBackground Process is to be killed in the back ground with
- * a separate thread
- */
- public void destroy(boolean inBackground) {
- LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
- if (pid == -1) {
- return;
- }
- if (isAlive(pid.toString())) {
- if (isSetsidAvailable && setsidUsed) {
- // In this case, we know that pid got created using setsid. So kill the
- // whole processGroup.
- try {
- assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
- inBackground);
- } catch (IOException e) {
- LOG.warn(StringUtils.stringifyException(e));
- }
- }
- else {
- //TODO: Destroy all the processes in the subtree in this case also.
- // For the time being, killing only the root process.
- destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
- }
- }
- }
-
private static final String PROCESSTREE_DUMP_FORMAT =
"\t|- %d %d %d %d %s %d %s\n";
@@ -383,15 +284,6 @@ public class ProcfsBasedProcessTree exte
}
/**
- *
- * Construct the ProcessInfo using the process' PID and procfs and return the
- * same. Returns null on failing to read from procfs,
- */
- private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
- return constructProcessInfo(pinfo, PROCFS);
- }
-
- /**
* Construct the ProcessInfo using the process' PID and procfs rooted at the
* specified directory and return the same. It is provided mainly to assist
* testing purposes.
@@ -449,58 +341,6 @@ public class ProcfsBasedProcessTree exte
}
/**
- * Is the process with PID pid still alive?
- */
- private boolean isAlive(Integer pid) {
- // This method assumes that isAlive is called on a pid that was alive not
- // too long ago, and hence assumes no chance of pid-wrapping-around.
- ShellCommandExecutor shexec = null;
- try {
- String[] args = { "kill", "-0", pid.toString() };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (ExitCodeException ee) {
- return false;
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command "
- + Arrays.toString(shexec.getExecString()) + ioe);
- return false;
- }
- return (shexec.getExitCode() == 0 ? true : false);
- }
-
- /**
- * Helper thread class that kills process-tree with SIGKILL in background
- */
- private class SigKillThread extends Thread {
-
- public void run() {
- this.setName(this.getClass().getName() + "-" + String.valueOf(pid));
- ShellCommandExecutor shexec = null;
-
- try {
- // Sleep for some time before sending SIGKILL
- Thread.sleep(sleepTimeBeforeSigKill);
- } catch (InterruptedException i) {
- LOG.warn("Thread sleep is interrupted.");
- }
-
- // Kill the root process with SIGKILL if it is still alive
- if (ProcfsBasedProcessTree.this.isAlive(pid)) {
- try {
- String[] args = { "kill", "-9", pid.toString() };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- LOG.info("Killing " + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
- }
- }
- }
- /**
* Returns a string printing PIDs of process present in the
* ProcfsBasedProcessTree. Output format : [pid pid ..]
*/
@@ -563,13 +403,6 @@ public class ProcfsBasedProcessTree exte
return age;
}
- public boolean isParent(ProcessInfo p) {
- if (pid.equals(p.getPpid())) {
- return true;
- }
- return false;
- }
-
public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
Integer sessionId, Long vmem) {
this.name = name;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java Fri Mar 4 04:43:33 2011
@@ -21,13 +21,9 @@ package org.apache.hadoop.filecache;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.*;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import java.net.URI;
@@ -176,179 +172,8 @@ public class DistributedCache {
public static final String CACHE_SYMLINK = "mapred.create.symlink";
/**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param fileStatus The file status on the dfs.
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred is
- * returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
- boolean isArchive, long confFileStamp,
- Path currentWorkDir)
- throws IOException {
- return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
- confFileStamp, currentWorkDir, true);
- }
-
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param fileStatus The file status on the dfs.
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred is
- * returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @param honorSymLinkConf if this is false, then the symlinks are not
- * created even if conf says so (this is required for an optimization in task
- * launches
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
- boolean isArchive, long confFileStamp,
- Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-
- return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
- confFileStamp, currentWorkDir, honorSymLinkConf, false);
- }
-
- /**
- * Get the locally cached file or archive; it could either be
- * previously cached (and valid) or copy it from the {@link FileSystem} now.
- *
- * @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
- * @param conf The Confguration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the files/archives
- * @param isArchive if the cache is an archive or a file. In case it is an
- * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
- * be unzipped/unjarred/untarred automatically
- * and the directory where the archive is unzipped/unjarred/untarred
- * is returned as the Path.
- * In case of a file, the path to the file is returned
- * @param confFileStamp this is the hdfs file modification timestamp to verify that the
- * file to be cached hasn't changed since the job started
- * @param currentWorkDir this is the directory where you would want to create symlinks
- * for the locally cached files/archives
- * @return the path to directory where the archives are unjarred in case of archives,
- * the path to the file where the file is copied locally
- * @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- public static Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, boolean isArchive,
- long confFileStamp, Path currentWorkDir)
- throws IOException {
- return getLocalCache(cache, conf,
- baseDir, null, isArchive,
- confFileStamp, currentWorkDir);
- }
-
- /**
- * This is the opposite of getlocalcache. When you are done with
- * using the cache, you need to release the cache
- * @param cache The cache URI to be released
- * @param conf configuration which contains the filesystem the cache
- * is contained in.
- * @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- public static void releaseCache(URI cache, Configuration conf)
- throws IOException {
- // find the timestamp of the uri
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
- String[] archivesTimestamps =
- DistributedCache.getArchiveTimestamps(conf);
- String[] filesTimestamps =
- DistributedCache.getFileTimestamps(conf);
- String timestamp = null;
- if (archives != null) {
- for (int i = 0; i < archives.length; i++) {
- if (archives[i].equals(cache)) {
- timestamp = archivesTimestamps[i];
- break;
- }
- }
- }
- if (timestamp == null && files != null) {
- for (int i = 0; i < files.length; i++) {
- if (files[i].equals(cache)) {
- timestamp = filesTimestamps[i];
- break;
- }
- }
- }
- if (timestamp == null) {
- throw new IOException("TimeStamp of the uri couldnot be found");
- }
- new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .releaseCache(cache, conf, Long.parseLong(timestamp),
- TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
- }
-
- /**
- * Returns the relative path of the dir this cache will be localized in
- * relative path that this cache will be localized in. For
- * hdfs://hostname:port/absolute_path -- the relative path is
- * hostname/absolute path -- if it is just /absolute_path -- then the
- * relative path is hostname of DFS this mapred cluster is running
- * on/absolute_path
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- @Deprecated
- public static String makeRelative(URI cache, Configuration conf)
- throws IOException {
- return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .makeRelative(cache, conf);
- }
-
- /**
- * Returns {@link FileStatus} of a given cache file on hdfs.
+ * Returns {@link FileStatus} of a given cache file on hdfs. Internal to
+ * MapReduce.
* @param conf configuration
* @param cache cache file
* @return <code>FileStatus</code> of a given cache file on hdfs
@@ -357,13 +182,11 @@ public class DistributedCache {
public static FileStatus getFileStatus(Configuration conf, URI cache)
throws IOException {
FileSystem fileSystem = FileSystem.get(cache, conf);
- Path filePath = new Path(cache.getPath());
-
- return fileSystem.getFileStatus(filePath);
+ return fileSystem.getFileStatus(new Path(cache.getPath()));
}
/**
- * Returns mtime of a given cache file on hdfs.
+ * Returns mtime of a given cache file on hdfs. Internal to MapReduce.
* @param conf configuration
* @param cache cache file
* @return mtime of a given cache file on hdfs
@@ -388,17 +211,6 @@ public class DistributedCache {
TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
}
- private static String getFileSysName(URI url) {
- String fsname = url.getScheme();
- if ("hdfs".equals(fsname)) {
- String host = url.getHost();
- int port = url.getPort();
- return (port == (-1)) ? host : (host + ":" + port);
- } else {
- return null;
- }
- }
-
/**
* Set the configuration with the given set of archives. Intended
* to be used by user code.
@@ -421,11 +233,22 @@ public class DistributedCache {
conf.set(CACHE_FILES, sfiles);
}
+ private static Path[] parsePaths(String[] strs) {
+ if (strs == null) {
+ return null;
+ }
+ Path[] result = new Path[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = new Path(strs[i]);
+ }
+ return result;
+ }
+
/**
* Get cache archives set in the Configuration. Used by
* internal DistributedCache and MapReduce code.
* @param conf The configuration which contains the archives
- * @return A URI array of the caches set in the Configuration
+ * @return An array of the caches set in the Configuration
* @throws IOException
*/
public static URI[] getCacheArchives(Configuration conf) throws IOException {
@@ -436,7 +259,7 @@ public class DistributedCache {
* Get cache files set in the Configuration. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which contains the files
- * @return A URI array of the files set in the Configuration
+ * @return Am array of the files set in the Configuration
* @throws IOException
*/
public static URI[] getCacheFiles(Configuration conf) throws IOException {
@@ -469,26 +292,41 @@ public class DistributedCache {
}
/**
+ * Parse a list of strings into longs.
+ * @param strs the list of strings to parse
+ * @return a list of longs that were parsed. same length as strs.
+ */
+ private static long[] parseTimestamps(String[] strs) {
+ if (strs == null) {
+ return null;
+ }
+ long[] result = new long[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Long.parseLong(strs[i]);
+ }
+ return result;
+ }
+
+ /**
* Get the timestamps of the archives. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
- * @return a string array of timestamps
+ * @return a long array of timestamps
* @throws IOException
*/
- public static String[] getArchiveTimestamps(Configuration conf) {
- return conf.getStrings(CACHE_ARCHIVES_TIMESTAMPS);
+ public static long[] getArchiveTimestamps(Configuration conf) {
+ return parseTimestamps(conf.getStrings(CACHE_ARCHIVES_TIMESTAMPS));
}
-
/**
* Get the timestamps of the files. Used by internal
* DistributedCache and MapReduce code.
* @param conf The configuration which stored the timestamps
- * @return a string array of timestamps
+ * @return a long array of timestamps
* @throws IOException
*/
- public static String[] getFileTimestamps(Configuration conf) {
- return conf.getStrings(CACHE_FILES_TIMESTAMPS);
+ public static long[] getFileTimestamps(Configuration conf) {
+ return parseTimestamps(conf.getStrings(CACHE_FILES_TIMESTAMPS));
}
/**
@@ -532,6 +370,30 @@ public class DistributedCache {
public static void setLocalFiles(Configuration conf, String str) {
conf.set(CACHE_LOCALFILES, str);
}
+
+ /**
+ * Add a archive that has been localized to the conf. Used
+ * by internal DistributedCache code.
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma separated list of local archives
+ */
+ public static void addLocalArchives(Configuration conf, String str) {
+ String archives = conf.get(CACHE_LOCALARCHIVES);
+ conf.set(CACHE_LOCALARCHIVES, archives == null ? str
+ : archives + "," + str);
+ }
+
+ /**
+ * Add a file that has been localized to the conf.. Used
+ * by internal DistributedCache code.
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma separated list of local files
+ */
+ public static void addLocalFiles(Configuration conf, String str) {
+ String files = conf.get(CACHE_LOCALFILES);
+ conf.set(CACHE_LOCALFILES, files == null ? str
+ : files + "," + str);
+ }
/**
* Add a archives to be localized to the conf. Intended to
@@ -606,7 +468,7 @@ public class DistributedCache {
String classpath = conf.get("mapred.job.classpath.files");
if (classpath == null)
return null;
- ArrayList list = Collections.list(new StringTokenizer(classpath, System
+ ArrayList<Object> list = Collections.list(new StringTokenizer(classpath, System
.getProperty("path.separator")));
Path[] paths = new Path[list.size()];
for (int i = 0; i < list.size(); i++) {
@@ -665,7 +527,7 @@ public class DistributedCache {
String classpath = conf.get("mapred.job.classpath.archives");
if (classpath == null)
return null;
- ArrayList list = Collections.list(new StringTokenizer(classpath, System
+ ArrayList<Object> list = Collections.list(new StringTokenizer(classpath, System
.getProperty("path.separator")));
Path[] paths = new Path[list.size()];
for (int i = 0; i < list.size(); i++) {
@@ -747,15 +609,4 @@ public class DistributedCache {
return true;
}
- /**
- * Clear the entire contents of the cache and delete the backing files. This
- * should only be used when the server is reinitializing, because the users
- * are going to lose their files.
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
- */
- public static void purgeCache(Configuration conf) throws IOException {
- new TrackerDistributedCacheManager(conf, new DefaultTaskController())
- .purgeCache();
- }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TaskDistributedCacheManager.java Fri Mar 4 04:43:33 2011
@@ -30,19 +30,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobLocalizer;
/**
* Helper class of {@link TrackerDistributedCacheManager} that represents
- * the cached files of a single task. This class is used
- * by TaskRunner/LocalJobRunner to parse out the job configuration
- * and setup the local caches.
+ * the cached files of a single job.
*
* <b>This class is internal to Hadoop, and should not be treated as a public
* interface.</b>
@@ -52,7 +48,7 @@ public class TaskDistributedCacheManager
private final Configuration taskConf;
private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
private final List<String> classPaths = new ArrayList<String>();
-
+
private boolean setupCalled = false;
/**
@@ -94,7 +90,7 @@ public class TaskDistributedCacheManager
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
- String[] timestamps, String cacheVisibilities[], Path[] paths,
+ long[] timestamps, boolean cacheVisibilities[], Path[] paths,
FileType type) throws IOException {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
@@ -110,9 +106,8 @@ public class TaskDistributedCacheManager
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
boolean isClassPath = (null != classPaths.get(u.getPath()));
- long t = Long.parseLong(timestamps[i]);
- ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
- t, isClassPath));
+ ret.add(new CacheFile(u, type, cacheVisibilities[i],
+ timestamps[i], isClassPath));
}
}
return ret;
@@ -148,36 +143,37 @@ public class TaskDistributedCacheManager
}
/**
- * Retrieve files into the local cache and updates the task configuration
- * (which has been passed in via the constructor).
+ * Retrieve public distributed cache files into the local cache and updates
+ * the task configuration (which has been passed in via the constructor).
+ * The private distributed cache is just looked at and the paths where the
+ * files/archives should go to is decided here. The actual localization is
+ * done by {@link JobLocalizer}.
*
* It is the caller's responsibility to re-write the task configuration XML
* file, if necessary.
*/
- public void setup(LocalDirAllocator lDirAlloc, File workDir,
- String privateCacheSubdir, String publicCacheSubDir) throws IOException {
+ public void setupCache(String publicCacheSubdir, String privateCacheSubdir)
+ throws IOException {
setupCalled = true;
-
- if (cacheFiles.isEmpty()) {
- return;
- }
-
ArrayList<Path> localArchives = new ArrayList<Path>();
ArrayList<Path> localFiles = new ArrayList<Path>();
- Path workdirPath = new Path(workDir.getAbsolutePath());
for (CacheFile cacheFile : cacheFiles) {
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
- String cacheSubdir = publicCacheSubDir;
- if (!cacheFile.isPublic) {
- cacheSubdir = privateCacheSubdir;
- }
- Path p = distributedCacheManager.getLocalCache(uri, taskConf,
- cacheSubdir, fileStatus,
- cacheFile.type == CacheFile.FileType.ARCHIVE,
- cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
+ Path p;
+ if (cacheFile.isPublic) {
+ p = distributedCacheManager.getLocalCache(uri, taskConf,
+ publicCacheSubdir, fileStatus,
+ cacheFile.type == CacheFile.FileType.ARCHIVE,
+ cacheFile.timestamp, cacheFile.isPublic);
+ } else {
+ p = distributedCacheManager.getLocalCache(uri, taskConf,
+ privateCacheSubdir, fileStatus,
+ cacheFile.type == CacheFile.FileType.ARCHIVE,
+ cacheFile.timestamp, cacheFile.isPublic);
+ }
cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
@@ -192,11 +188,11 @@ public class TaskDistributedCacheManager
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
- DistributedCache.setLocalArchives(taskConf,
+ DistributedCache.addLocalArchives(taskConf,
stringifyPathList(localArchives));
}
if (!localFiles.isEmpty()) {
- DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+ DistributedCache.addLocalFiles(taskConf, stringifyPathList(localFiles));
}
}
@@ -232,6 +228,28 @@ public class TaskDistributedCacheManager
}
return classPaths;
}
+
+ private List<String> formClasspath(Path[] paths, URI[] uris,
+ Path[] localizedFiles) {
+ if (uris == null) {
+ return new ArrayList<String>();
+ }
+ Map<String, Path> clMap = new HashMap<String, Path>();
+ List<String> classPaths = new ArrayList<String>();
+ if (paths != null) {
+ for (Path p : paths) {
+ clMap.put(p.toUri().getPath().toString(), p);
+ }
+ }
+ for (int i = 0; i < uris.length; ++i) {
+ URI u = uris[i];
+ boolean isClassPath = (null != clMap.get(u.getPath()));
+ if (isClassPath) {
+ classPaths.add(localizedFiles[i].toString());
+ }
+ }
+ return classPaths;
+ }
/**
* Releases the cached files/archives, so that space
@@ -246,6 +264,16 @@ public class TaskDistributedCacheManager
}
}
+ public void setSizes(long[] sizes) throws IOException {
+ int i = 0;
+ for (CacheFile c: cacheFiles) {
+ if (!c.isPublic) {
+ distributedCacheManager.setSize(c.uri, taskConf, c.timestamp, c.owner,
+ sizes[i++]);
+ }
+ }
+ }
+
/**
* Creates a class loader that includes the designated
* files and archives.