You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC
svn commit: r1079211 [7/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/contrib/fairscheduler/designdoc/ src/contrib/streaming/s...
Added: hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/c%2B%2B/task-controller/missing?rev=1079211&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing Tue Mar 8 05:56:27 2011
@@ -0,0 +1,360 @@
+#! /bin/sh
+# Common stub for a few missing GNU programs while installing.
+
+scriptversion=2005-06-08.21
+
+# Copyright (C) 1996, 1997, 1999, 2000, 2002, 2003, 2004, 2005
+# Free Software Foundation, Inc.
+# Originally by Fran,cois Pinard <pi...@iro.umontreal.ca>, 1996.
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+# As a special exception to the GNU General Public License, if you
+# distribute this file as part of a program that contains a
+# configuration script generated by Autoconf, you may include it under
+# the same distribution terms that you use for the rest of that program.
+
+if test $# -eq 0; then
+ echo 1>&2 "Try \`$0 --help' for more information"
+ exit 1
+fi
+
+run=:
+
+# In the cases where this matters, `missing' is being run in the
+# srcdir already.
+if test -f configure.ac; then
+ configure_ac=configure.ac
+else
+ configure_ac=configure.in
+fi
+
+msg="missing on your system"
+
+case "$1" in
+--run)
+ # Try to run requested program, and just exit if it succeeds.
+ run=
+ shift
+ "$@" && exit 0
+ # Exit code 63 means version mismatch. This often happens
+ # when the user try to use an ancient version of a tool on
+ # a file that requires a minimum version. In this case we
+ # we should proceed has if the program had been absent, or
+ # if --run hadn't been passed.
+ if test $? = 63; then
+ run=:
+ msg="probably too old"
+ fi
+ ;;
+
+ -h|--h|--he|--hel|--help)
+ echo "\
+$0 [OPTION]... PROGRAM [ARGUMENT]...
+
+Handle \`PROGRAM [ARGUMENT]...' for when PROGRAM is missing, or return an
+error status if there is no known handling for PROGRAM.
+
+Options:
+ -h, --help display this help and exit
+ -v, --version output version information and exit
+ --run try to run the given command, and emulate it if it fails
+
+Supported PROGRAM values:
+ aclocal touch file \`aclocal.m4'
+ autoconf touch file \`configure'
+ autoheader touch file \`config.h.in'
+ automake touch all \`Makefile.in' files
+ bison create \`y.tab.[ch]', if possible, from existing .[ch]
+ flex create \`lex.yy.c', if possible, from existing .c
+ help2man touch the output file
+ lex create \`lex.yy.c', if possible, from existing .c
+ makeinfo touch the output file
+ tar try tar, gnutar, gtar, then tar without non-portable flags
+ yacc create \`y.tab.[ch]', if possible, from existing .[ch]
+
+Send bug reports to <bu...@gnu.org>."
+ exit $?
+ ;;
+
+ -v|--v|--ve|--ver|--vers|--versi|--versio|--version)
+ echo "missing $scriptversion (GNU Automake)"
+ exit $?
+ ;;
+
+ -*)
+ echo 1>&2 "$0: Unknown \`$1' option"
+ echo 1>&2 "Try \`$0 --help' for more information"
+ exit 1
+ ;;
+
+esac
+
+# Now exit if we have it, but it failed. Also exit now if we
+# don't have it and --version was passed (most likely to detect
+# the program).
+case "$1" in
+ lex|yacc)
+ # Not GNU programs, they don't have --version.
+ ;;
+
+ tar)
+ if test -n "$run"; then
+ echo 1>&2 "ERROR: \`tar' requires --run"
+ exit 1
+ elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+ exit 1
+ fi
+ ;;
+
+ *)
+ if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+ # We have it, but it failed.
+ exit 1
+ elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+ # Could not run --version or --help. This is probably someone
+ # running `$TOOL --version' or `$TOOL --help' to check whether
+ # $TOOL exists and not knowing $TOOL uses missing.
+ exit 1
+ fi
+ ;;
+esac
+
+# If it does not exist, or fails to run (possibly an outdated version),
+# try to emulate it.
+case "$1" in
+ aclocal*)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`acinclude.m4' or \`${configure_ac}'. You might want
+ to install the \`Automake' and \`Perl' packages. Grab them from
+ any GNU archive site."
+ touch aclocal.m4
+ ;;
+
+ autoconf)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`${configure_ac}'. You might want to install the
+ \`Autoconf' and \`GNU m4' packages. Grab them from any GNU
+ archive site."
+ touch configure
+ ;;
+
+ autoheader)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`acconfig.h' or \`${configure_ac}'. You might want
+ to install the \`Autoconf' and \`GNU m4' packages. Grab them
+ from any GNU archive site."
+ files=`sed -n 's/^[ ]*A[CM]_CONFIG_HEADER(\([^)]*\)).*/\1/p' ${configure_ac}`
+ test -z "$files" && files="config.h"
+ touch_files=
+ for f in $files; do
+ case "$f" in
+ *:*) touch_files="$touch_files "`echo "$f" |
+ sed -e 's/^[^:]*://' -e 's/:.*//'`;;
+ *) touch_files="$touch_files $f.in";;
+ esac
+ done
+ touch $touch_files
+ ;;
+
+ automake*)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified \`Makefile.am', \`acinclude.m4' or \`${configure_ac}'.
+ You might want to install the \`Automake' and \`Perl' packages.
+ Grab them from any GNU archive site."
+ find . -type f -name Makefile.am -print |
+ sed 's/\.am$/.in/' |
+ while read f; do touch "$f"; done
+ ;;
+
+ autom4te)
+ echo 1>&2 "\
+WARNING: \`$1' is needed, but is $msg.
+ You might have modified some files without having the
+ proper tools for further handling them.
+ You can get \`$1' as part of \`Autoconf' from any GNU
+ archive site."
+
+ file=`echo "$*" | sed -n 's/.*--output[ =]*\([^ ]*\).*/\1/p'`
+ test -z "$file" && file=`echo "$*" | sed -n 's/.*-o[ ]*\([^ ]*\).*/\1/p'`
+ if test -f "$file"; then
+ touch $file
+ else
+ test -z "$file" || exec >$file
+ echo "#! /bin/sh"
+ echo "# Created by GNU Automake missing as a replacement of"
+ echo "# $ $@"
+ echo "exit 0"
+ chmod +x $file
+ exit 1
+ fi
+ ;;
+
+ bison|yacc)
+ echo 1>&2 "\
+WARNING: \`$1' $msg. You should only need it if
+ you modified a \`.y' file. You may need the \`Bison' package
+ in order for those modifications to take effect. You can get
+ \`Bison' from any GNU archive site."
+ rm -f y.tab.c y.tab.h
+ if [ $# -ne 1 ]; then
+ eval LASTARG="\${$#}"
+ case "$LASTARG" in
+ *.y)
+ SRCFILE=`echo "$LASTARG" | sed 's/y$/c/'`
+ if [ -f "$SRCFILE" ]; then
+ cp "$SRCFILE" y.tab.c
+ fi
+ SRCFILE=`echo "$LASTARG" | sed 's/y$/h/'`
+ if [ -f "$SRCFILE" ]; then
+ cp "$SRCFILE" y.tab.h
+ fi
+ ;;
+ esac
+ fi
+ if [ ! -f y.tab.h ]; then
+ echo >y.tab.h
+ fi
+ if [ ! -f y.tab.c ]; then
+ echo 'main() { return 0; }' >y.tab.c
+ fi
+ ;;
+
+ lex|flex)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified a \`.l' file. You may need the \`Flex' package
+ in order for those modifications to take effect. You can get
+ \`Flex' from any GNU archive site."
+ rm -f lex.yy.c
+ if [ $# -ne 1 ]; then
+ eval LASTARG="\${$#}"
+ case "$LASTARG" in
+ *.l)
+ SRCFILE=`echo "$LASTARG" | sed 's/l$/c/'`
+ if [ -f "$SRCFILE" ]; then
+ cp "$SRCFILE" lex.yy.c
+ fi
+ ;;
+ esac
+ fi
+ if [ ! -f lex.yy.c ]; then
+ echo 'main() { return 0; }' >lex.yy.c
+ fi
+ ;;
+
+ help2man)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified a dependency of a manual page. You may need the
+ \`Help2man' package in order for those modifications to take
+ effect. You can get \`Help2man' from any GNU archive site."
+
+ file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+ if test -z "$file"; then
+ file=`echo "$*" | sed -n 's/.*--output=\([^ ]*\).*/\1/p'`
+ fi
+ if [ -f "$file" ]; then
+ touch $file
+ else
+ test -z "$file" || exec >$file
+ echo ".ab help2man is required to generate this page"
+ exit 1
+ fi
+ ;;
+
+ makeinfo)
+ echo 1>&2 "\
+WARNING: \`$1' is $msg. You should only need it if
+ you modified a \`.texi' or \`.texinfo' file, or any other file
+ indirectly affecting the aspect of the manual. The spurious
+ call might also be the consequence of using a buggy \`make' (AIX,
+ DU, IRIX). You might want to install the \`Texinfo' package or
+ the \`GNU make' package. Grab either from any GNU archive site."
+ # The file to touch is that specified with -o ...
+ file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+ if test -z "$file"; then
+ # ... or it is the one specified with @setfilename ...
+ infile=`echo "$*" | sed 's/.* \([^ ]*\) *$/\1/'`
+ file=`sed -n '/^@setfilename/ { s/.* \([^ ]*\) *$/\1/; p; q; }' $infile`
+ # ... or it is derived from the source name (dir/f.texi becomes f.info)
+ test -z "$file" && file=`echo "$infile" | sed 's,.*/,,;s,.[^.]*$,,'`.info
+ fi
+ # If the file does not exist, the user really needs makeinfo;
+ # let's fail without touching anything.
+ test -f $file || exit 1
+ touch $file
+ ;;
+
+ tar)
+ shift
+
+ # We have already tried tar in the generic part.
+ # Look for gnutar/gtar before invocation to avoid ugly error
+ # messages.
+ if (gnutar --version > /dev/null 2>&1); then
+ gnutar "$@" && exit 0
+ fi
+ if (gtar --version > /dev/null 2>&1); then
+ gtar "$@" && exit 0
+ fi
+ firstarg="$1"
+ if shift; then
+ case "$firstarg" in
+ *o*)
+ firstarg=`echo "$firstarg" | sed s/o//`
+ tar "$firstarg" "$@" && exit 0
+ ;;
+ esac
+ case "$firstarg" in
+ *h*)
+ firstarg=`echo "$firstarg" | sed s/h//`
+ tar "$firstarg" "$@" && exit 0
+ ;;
+ esac
+ fi
+
+ echo 1>&2 "\
+WARNING: I can't seem to be able to run \`tar' with the given arguments.
+ You may want to install GNU tar or Free paxutils, or check the
+ command line arguments."
+ exit 1
+ ;;
+
+ *)
+ echo 1>&2 "\
+WARNING: \`$1' is needed, and is $msg.
+ You might have modified some files without having the
+ proper tools for further handling them. Check the \`README' file,
+ it often tells you about the needed prerequisites for installing
+ this package. You may also peek at any GNU archive site, in case
+ some other package would contain this missing \`$1' program."
+ exit 1
+ ;;
+esac
+
+exit 0
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-end: "$"
+# End:
Propchange: hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing
------------------------------------------------------------------------------
svn:executable = *
Added: hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1079211&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/test/test-task-controller.c (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/test/test-task-controller.c Tue Mar 8 05:56:27 2011
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "configuration.h"
+#include "task-controller.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#define TEST_ROOT "/tmp/test-task-controller"
+#define DONT_TOUCH_FILE "dont-touch-me"
+
+static char* username = NULL;
+
+/**
+ * Run the command using the effective user id.
+ * It can't use system, since bash seems to copy the real user id into the
+ * effective id.
+ */
+void run(const char *cmd) {
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: failed to fork - %s\n", strerror(errno));
+ } else if (child == 0) {
+ char *cmd_copy = strdup(cmd);
+ char *ptr;
+ int words = 1;
+ for(ptr = strchr(cmd_copy, ' '); ptr; ptr = strchr(ptr+1, ' ')) {
+ words += 1;
+ }
+ char **argv = malloc(sizeof(char *) * (words + 1));
+ ptr = strtok(cmd_copy, " ");
+ int i = 0;
+ argv[i++] = ptr;
+ while (ptr != NULL) {
+ ptr = strtok(NULL, " ");
+ argv[i++] = ptr;
+ }
+ if (execvp(argv[0], argv) != 0) {
+ printf("FAIL: exec failed in child %s - %s\n", cmd, strerror(errno));
+ exit(42);
+ }
+ } else {
+ int status = 0;
+ if (waitpid(child, &status, 0) <= 0) {
+ printf("FAIL: failed waiting for child process %s pid %d - %s\n",
+ cmd, child, strerror(errno));
+ exit(1);
+ }
+ if (!WIFEXITED(status)) {
+ printf("FAIL: process %s pid %d did not exit\n", cmd, child);
+ exit(1);
+ }
+ if (WEXITSTATUS(status) != 0) {
+ printf("FAIL: process %s pid %d exited with error status %d\n", cmd,
+ child, WEXITSTATUS(status));
+ exit(1);
+ }
+ }
+}
+
+int write_config_file(char *file_name) {
+ FILE *file;
+ file = fopen(file_name, "w");
+ if (file == NULL) {
+ printf("Failed to open %s.\n", file_name);
+ return EXIT_FAILURE;
+ }
+ fprintf(file, "mapred.local.dir=" TEST_ROOT "/local-1");
+ int i;
+ for(i=2; i < 5; ++i) {
+ fprintf(file, "," TEST_ROOT "/local-%d", i);
+ }
+ fprintf(file, "\n");
+ fprintf(file, "hadoop.log.dir=" TEST_ROOT "/logs\n");
+ fclose(file);
+ return 0;
+}
+
+void create_tt_roots() {
+ char** tt_roots = get_values("mapred.local.dir");
+ char** tt_root;
+ for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) {
+ if (mkdir(*tt_root, 0755) != 0) {
+ printf("FAIL: Can't create directory %s - %s\n", *tt_root,
+ strerror(errno));
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/taskTracker", *tt_root);
+ if (mkdir(buffer, 0755) != 0) {
+ printf("FAIL: Can't create directory %s - %s\n", buffer,
+ strerror(errno));
+ exit(1);
+ }
+ }
+ free_values(tt_roots);
+}
+
+void test_get_user_directory() {
+ char *user_dir = get_user_directory("/tmp", "user");
+ char *expected = "/tmp/taskTracker/user";
+ if (strcmp(user_dir, expected) != 0) {
+ printf("test_get_user_directory expected %s got %s\n", user_dir, expected);
+ exit(1);
+ }
+ free(user_dir);
+}
+
+void test_get_job_directory() {
+ char *expected = "/tmp/taskTracker/user/jobcache/job_200906101234_0001";
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ if (strcmp(job_dir, expected) != 0) {
+ exit(1);
+ }
+ free(job_dir);
+}
+
+void test_get_attempt_directory() {
+ char *attempt_dir = get_attempt_work_directory("/tmp", "owen", "job_1",
+ "attempt_1");
+ char *expected = "/tmp/taskTracker/owen/jobcache/job_1/attempt_1/work";
+ if (strcmp(attempt_dir, expected) != 0) {
+ printf("Fail get_attempt_work_directory got %s expected %s\n",
+ attempt_dir, expected);
+ }
+ free(attempt_dir);
+}
+
+void test_get_task_launcher_file() {
+ char *expected_file = ("/tmp/taskTracker/user/jobcache/job_200906101234_0001"
+ "/taskjvm.sh");
+ char *job_dir = get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ char *task_file = get_task_launcher_file(job_dir);
+ if (strcmp(task_file, expected_file) != 0) {
+ printf("failure to match expected task file %s vs %s\n", task_file,
+ expected_file);
+ exit(1);
+ }
+ free(job_dir);
+ free(task_file);
+}
+
+void test_get_job_log_dir() {
+ char *expected = TEST_ROOT "/logs/userlogs/job_200906101234_0001";
+ char *logdir = get_job_log_directory("job_200906101234_0001");
+ if (strcmp(logdir, expected) != 0) {
+ printf("Fail get_job_log_dir got %s expected %s\n", logdir, expected);
+ exit(1);
+ }
+ free(logdir);
+}
+
+void test_get_task_log_dir() {
+ char *logdir = get_job_log_directory("job_5/task_4");
+ char *expected = TEST_ROOT "/logs/userlogs/job_5/task_4";
+ if (strcmp(logdir, expected) != 0) {
+ printf("FAIL: get_task_log_dir expected %s got %s\n", logdir, expected);
+ }
+ free(logdir);
+}
+
+void test_check_user() {
+ printf("\nTesting test_check_user\n");
+ struct passwd *user = check_user(username);
+ if (user == NULL) {
+ printf("FAIL: failed check for user %s\n", username);
+ exit(1);
+ }
+ free(user);
+ if (check_user("lp") != NULL) {
+ printf("FAIL: failed check for system user lp\n");
+ exit(1);
+ }
+ if (check_user("root") != NULL) {
+ printf("FAIL: failed check for system user root\n");
+ exit(1);
+ }
+ if (check_user("mapred") != NULL) {
+ printf("FAIL: failed check for hadoop user mapred\n");
+ exit(1);
+ }
+}
+
+void test_check_configuration_permissions() {
+ printf("\nTesting check_configuration_permissions\n");
+ if (check_configuration_permissions("/etc/passwd") != 0) {
+ printf("FAIL: failed permission check on /etc/passwd\n");
+ exit(1);
+ }
+ if (check_configuration_permissions(TEST_ROOT) == 0) {
+ printf("FAIL: failed permission check on %s\n", TEST_ROOT);
+ exit(1);
+ }
+}
+
+void test_delete_task() {
+ if (initialize_user(username)) {
+ printf("FAIL: failed to initialized user %s\n", username);
+ exit(1);
+ }
+ char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
+ char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username,
+ DONT_TOUCH_FILE);
+ char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2",
+ username, "job_1", "task_1");
+ char buffer[100000];
+ sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+ run(buffer);
+ sprintf(buffer, "touch %s", dont_touch);
+ run(buffer);
+
+ // soft link to the canary file from the task directory
+ sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+ run(buffer);
+ // hard link to the canary file from the task directory
+ sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+ run(buffer);
+ // create a dot file in the task directory
+ sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+ run(buffer);
+ // create a no permission file
+ sprintf(buffer, "touch %s/who/let/protect", task_dir);
+ run(buffer);
+ sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+ run(buffer);
+ // create a no permission directory
+ sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+ run(buffer);
+
+ // delete task directory
+ int ret = delete_as_user(username, "jobcache/job_1/task_1");
+ if (ret != 0) {
+ printf("FAIL: return code from delete_as_user is %d\n", ret);
+ exit(1);
+ }
+
+ // check to make sure the task directory is gone
+ if (access(task_dir, R_OK) == 0) {
+ printf("FAIL: failed to delete the directory - %s\n", task_dir);
+ exit(1);
+ }
+ // check to make sure the job directory is not gone
+ if (access(job_dir, R_OK) != 0) {
+ printf("FAIL: accidently deleted the directory - %s\n", job_dir);
+ exit(1);
+ }
+ // but that the canary is not gone
+ if (access(dont_touch, R_OK) != 0) {
+ printf("FAIL: accidently deleted file %s\n", dont_touch);
+ exit(1);
+ }
+ sprintf(buffer, "chmod -R 700 %s", job_dir);
+ run(buffer);
+ sprintf(buffer, "rm -fr %s", job_dir);
+ run(buffer);
+ free(job_dir);
+ free(task_dir);
+ free(dont_touch);
+}
+
+void test_delete_job() {
+ char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2");
+ char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username,
+ DONT_TOUCH_FILE);
+ char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2",
+ username, "job_2", "task_1");
+ char buffer[100000];
+ sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+ run(buffer);
+ sprintf(buffer, "touch %s", dont_touch);
+ run(buffer);
+
+ // soft link to the canary file from the task directory
+ sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+ run(buffer);
+ // hard link to the canary file from the task directory
+ sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+ run(buffer);
+ // create a dot file in the task directory
+ sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+ run(buffer);
+ // create a no permission file
+ sprintf(buffer, "touch %s/who/let/protect", task_dir);
+ run(buffer);
+ sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+ run(buffer);
+ // create a no permission directory
+ sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+ run(buffer);
+
+ // delete task directory
+ int ret = delete_as_user(username, "jobcache/job_2");
+ if (ret != 0) {
+ printf("FAIL: return code from delete_as_user is %d\n", ret);
+ exit(1);
+ }
+
+ // check to make sure the task directory is gone
+ if (access(task_dir, R_OK) == 0) {
+ printf("FAIL: failed to delete the directory - %s\n", task_dir);
+ exit(1);
+ }
+ // check to make sure the job directory is gone
+ if (access(job_dir, R_OK) == 0) {
+ printf("FAIL: didn't delete the directory - %s\n", job_dir);
+ exit(1);
+ }
+ // but that the canary is not gone
+ if (access(dont_touch, R_OK) != 0) {
+ printf("FAIL: accidently deleted file %s\n", dont_touch);
+ exit(1);
+ }
+ free(job_dir);
+ free(task_dir);
+ free(dont_touch);
+}
+
+
+void test_delete_user() {
+ printf("\nTesting delete_user\n");
+ char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3");
+ if (mkdirs(job_dir, 0700) != 0) {
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/local-1/taskTracker/%s", TEST_ROOT, username);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: directory missing before test\n");
+ exit(1);
+ }
+ if (delete_as_user(username, "") != 0) {
+ exit(1);
+ }
+ if (access(buffer, R_OK) == 0) {
+ printf("FAIL: directory not deleted\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/local-1", R_OK) != 0) {
+ printf("FAIL: local-1 directory does not exist\n");
+ exit(1);
+ }
+ free(job_dir);
+}
+
+void test_delete_log_directory() {
+ printf("\nTesting delete_log_directory\n");
+ char *job_log_dir = get_job_log_directory("job_1");
+ if (job_log_dir == NULL) {
+ exit(1);
+ }
+ if (create_directory_for_user(job_log_dir) != 0) {
+ exit(1);
+ }
+ free(job_log_dir);
+ char *task_log_dir = get_job_log_directory("job_1/task_2");
+ if (task_log_dir == NULL) {
+ exit(1);
+ }
+ if (mkdirs(task_log_dir, 0700) != 0) {
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) != 0) {
+ printf("FAIL: can't access task directory - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (delete_log_directory("job_1/task_2") != 0) {
+ printf("FAIL: can't delete task directory\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) == 0) {
+ printf("FAIL: task directory not deleted\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) != 0) {
+ printf("FAIL: job directory not deleted - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (delete_log_directory("job_1") != 0) {
+ printf("FAIL: can't delete task directory\n");
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) == 0) {
+ printf("FAIL: job directory not deleted\n");
+ exit(1);
+ }
+ free(task_log_dir);
+}
+
+void run_test_in_child(const char* test_name, void (*func)()) {
+ printf("\nRunning test %s in child process\n", test_name);
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ func();
+ exit(0);
+ } else {
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
+ exit(1);
+ }
+ if (!WIFEXITED(status)) {
+ printf("FAIL: child %d didn't exit - %d\n", child, status);
+ exit(1);
+ }
+ if (WEXITSTATUS(status) != 0) {
+ printf("FAIL: child %d exited with bad status %d\n",
+ child, WEXITSTATUS(status));
+ exit(1);
+ }
+ }
+}
+
+void test_signal_task() {
+ printf("\nTesting signal_task\n");
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ exit(1);
+ }
+ sleep(3600);
+ exit(0);
+ } else {
+ printf("Child task launched as %d\n", child);
+ if (signal_user_task(username, child, SIGQUIT) != 0) {
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (!WIFSIGNALED(status)) {
+ printf("FAIL: child wasn't signalled - %d\n", status);
+ exit(1);
+ }
+ if (WTERMSIG(status) != SIGQUIT) {
+ printf("FAIL: child was killed with %d instead of %d\n",
+ WTERMSIG(status), SIGQUIT);
+ exit(1);
+ }
+ }
+}
+
+void test_signal_task_group() {
+ printf("\nTesting group signal_task\n");
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: fork failed\n");
+ exit(1);
+ } else if (child == 0) {
+ setpgrp();
+ if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+ exit(1);
+ }
+ sleep(3600);
+ exit(0);
+ }
+ printf("Child task launched as %d\n", child);
+ if (signal_user_task(username, child, SIGKILL) != 0) {
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) == -1) {
+ printf("FAIL: waitpid failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (!WIFSIGNALED(status)) {
+ printf("FAIL: child wasn't signalled - %d\n", status);
+ exit(1);
+ }
+ if (WTERMSIG(status) != SIGKILL) {
+ printf("FAIL: child was killed with %d instead of %d\n",
+ WTERMSIG(status), SIGKILL);
+ exit(1);
+ }
+}
+
+void test_init_job() {
+ printf("\nTesting init job\n");
+ if (seteuid(0) != 0) {
+ printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ FILE* creds = fopen(TEST_ROOT "/creds.txt", "w");
+ if (creds == NULL) {
+ printf("FAIL: failed to create credentials file - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fprintf(creds, "secret key\n") < 0) {
+ printf("FAIL: fprintf failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fclose(creds) != 0) {
+ printf("FAIL: fclose failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ FILE* job_xml = fopen(TEST_ROOT "/job.xml", "w");
+ if (job_xml == NULL) {
+ printf("FAIL: failed to create job file - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fprintf(job_xml, "<jobconf/>\n") < 0) {
+ printf("FAIL: fprintf failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fclose(job_xml) != 0) {
+ printf("FAIL: fclose failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (seteuid(user_detail->pw_uid) != 0) {
+ printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+ exit(1);
+ }
+ fflush(stdout);
+ fflush(stderr);
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: failed to fork process for init_job - %s\n",
+ strerror(errno));
+ exit(1);
+ } else if (child == 0) {
+ char *final_pgm[] = {"touch", "my-touch-file", 0};
+ if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt",
+ TEST_ROOT "/job.xml", final_pgm) != 0) {
+ printf("FAIL: failed in child\n");
+ exit(42);
+ }
+ // should never return
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) <= 0) {
+ printf("FAIL: failed waiting for process %d - %s\n", child,
+ strerror(errno));
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_4", R_OK) != 0) {
+ printf("FAIL: failed to create job log directory\n");
+ exit(1);
+ }
+ char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_4");
+ if (access(job_dir, R_OK) != 0) {
+ printf("FAIL: failed to create job directory %s\n", job_dir);
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/jobToken", job_dir);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: failed to create credentials %s\n", buffer);
+ exit(1);
+ }
+ sprintf(buffer, "%s/my-touch-file", job_dir);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: failed to create touch file %s\n", buffer);
+ exit(1);
+ }
+ free(job_dir);
+ job_dir = get_job_log_directory("job_4");
+ if (access(job_dir, R_OK) != 0) {
+ printf("FAIL: failed to create job log directory %s\n", job_dir);
+ exit(1);
+ }
+ free(job_dir);
+}
+
+void test_run_task() {
+ printf("\nTesting run task\n");
+ if (seteuid(0) != 0) {
+ printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ const char* script_name = TEST_ROOT "/task-script";
+ FILE* script = fopen(script_name, "w");
+ if (script == NULL) {
+ printf("FAIL: failed to create script file - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (seteuid(user_detail->pw_uid) != 0) {
+ printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fprintf(script, "#!/bin/bash\n"
+ "touch foobar\n"
+ "exit 0") < 0) {
+ printf("FAIL: fprintf failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ if (fclose(script) != 0) {
+ printf("FAIL: fclose failed - %s\n", strerror(errno));
+ exit(1);
+ }
+ fflush(stdout);
+ fflush(stderr);
+ char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1",
+ username, "job_4", "task_1");
+ pid_t child = fork();
+ if (child == -1) {
+ printf("FAIL: failed to fork process for init_job - %s\n",
+ strerror(errno));
+ exit(1);
+ } else if (child == 0) {
+ if (run_task_as_user(username, "job_4", "task_1",
+ task_dir, script_name) != 0) {
+ printf("FAIL: failed in child\n");
+ exit(42);
+ }
+ // should never return
+ exit(1);
+ }
+ int status = 0;
+ if (waitpid(child, &status, 0) <= 0) {
+ printf("FAIL: failed waiting for process %d - %s\n", child,
+ strerror(errno));
+ exit(1);
+ }
+ if (access(TEST_ROOT "/logs/userlogs/job_4/task_1", R_OK) != 0) {
+ printf("FAIL: failed to create task log directory\n");
+ exit(1);
+ }
+ if (access(task_dir, R_OK) != 0) {
+ printf("FAIL: failed to create task directory %s\n", task_dir);
+ exit(1);
+ }
+ char buffer[100000];
+ sprintf(buffer, "%s/foobar", task_dir);
+ if (access(buffer, R_OK) != 0) {
+ printf("FAIL: failed to create touch file %s\n", buffer);
+ exit(1);
+ }
+ free(task_dir);
+ task_dir = get_job_log_directory("job_4/task_1");
+ if (access(task_dir, R_OK) != 0) {
+ printf("FAIL: failed to create job log directory %s\n", task_dir);
+ exit(1);
+ }
+ free(task_dir);
+}
+
+int main(int argc, char **argv) {
+ LOGFILE = stdout;
+ int my_username = 0;
+
+ // clean up any junk from previous run
+ system("chmod -R u=rwx " TEST_ROOT "; rm -fr " TEST_ROOT);
+
+ if (mkdirs(TEST_ROOT "/logs/userlogs", 0755) != 0) {
+ exit(1);
+ }
+
+ if (write_config_file(TEST_ROOT "/test.cfg") != 0) {
+ exit(1);
+ }
+ read_config(TEST_ROOT "/test.cfg");
+
+ create_tt_roots();
+
+ if (getuid() == 0 && argc == 2) {
+ username = argv[1];
+ } else {
+ username = strdup(getpwuid(getuid())->pw_name);
+ my_username = 1;
+ }
+ set_tasktracker_uid(geteuid(), getegid());
+
+ if (set_user(username)) {
+ exit(1);
+ }
+
+ printf("\nStarting tests\n");
+
+ printf("\nTesting get_user_directory()\n");
+ test_get_user_directory();
+
+ printf("\nTesting get_job_directory()\n");
+ test_get_job_directory();
+
+ printf("\nTesting get_attempt_directory()\n");
+ test_get_attempt_directory();
+
+ printf("\nTesting get_task_launcher_file()\n");
+ test_get_task_launcher_file();
+
+ printf("\nTesting get_job_log_dir()\n");
+ test_get_job_log_dir();
+
+ test_check_configuration_permissions();
+
+ printf("\nTesting get_task_log_dir()\n");
+ test_get_task_log_dir();
+
+ printf("\nTesting delete_task()\n");
+ test_delete_task();
+
+ printf("\nTesting delete_job()\n");
+ test_delete_job();
+
+ test_delete_user();
+
+ test_check_user();
+
+ test_delete_log_directory();
+
+ // the tests that change user need to be run in a subshell, so that
+ // when they change user they don't give up our privs
+ run_test_in_child("test_signal_task", test_signal_task);
+ run_test_in_child("test_signal_task_group", test_signal_task_group);
+
+ // init job and run task can't be run if you aren't testing as root
+ if (getuid() == 0) {
+ // these tests do internal forks so that the change_owner and execs
+ // don't mess up our process.
+ test_init_job();
+ test_run_task();
+ }
+
+ seteuid(0);
+ run("rm -fr " TEST_ROOT);
+ printf("\nFinished tests\n");
+
+ if (my_username) {
+ free(username);
+ }
+ free_configurations();
+ return 0;
+}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.pdf
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.pdf?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
Binary files - no diff available.
Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Mar 8 05:56:27 2011
@@ -220,7 +220,7 @@ public abstract class PipeMapRed {
} catch (IOException e) {
LOG.error("configuration exception", e);
throw new RuntimeException("configuration exception", e);
- } catch (InterruptedException e) {
+ } catch (InterruptedException e) {
LOG.error("configuration exception", e);
throw new RuntimeException("configuration exception", e);
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java Tue Mar 8 05:56:27 2011
@@ -24,11 +24,14 @@ import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.TaskType;
@@ -55,6 +58,7 @@ class Child {
static volatile TaskAttemptID taskid = null;
static volatile boolean isCleanup;
+ static String cwd;
public static void main(String[] args) throws Throwable {
LOG.debug("Child starting");
@@ -76,6 +80,11 @@ class Child {
DefaultMetricsSystem.initialize(
StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
+ cwd = System.getenv().get(TaskRunner.HADOOP_WORK_DIR);
+ if (cwd == null) {
+ throw new IOException("Environment variable " +
+ TaskRunner.HADOOP_WORK_DIR + " is not set");
+ }
//load token cache storage
String jobTokenFile =
System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
@@ -177,10 +186,6 @@ class Child {
isCleanup = task.isTaskCleanupTask();
// reset the statistics for the task
FileSystem.clearStatistics();
-
- //create the index file so that the log files
- //are viewable immediately
- TaskLog.syncLogs(logLocation, taskid, isCleanup);
// Create the job-conf and set credentials
final JobConf job = new JobConf(task.getJobFile());
@@ -193,12 +198,19 @@ class Child {
// setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
// can only see files down and under attemtdir only.
TaskRunner.setupChildMapredLocalDirs(task, job);
+
+ // setup the child's attempt directories
+ localizeTask(task, job, logLocation);
//setupWorkDir actually sets up the symlinks for the distributed
//cache. After a task exits we wipe the workdir clean, and hence
//the symlinks have to be rebuilt.
- TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
-
+ TaskRunner.setupWorkDir(job, new File(cwd));
+
+ //create the index file so that the log files
+ //are viewable immediately
+ TaskLog.syncLogs(logLocation, taskid, isCleanup);
+
numTasksToExecute = job.getNumTasksToExecutePerJvm();
assert(numTasksToExecute != 0);
@@ -284,4 +296,19 @@ class Child {
LogManager.shutdown();
}
}
+ static void localizeTask(Task task, JobConf jobConf, String logLocation)
+ throws IOException{
+
+ // Do the task-type specific localization
+ task.localizeConfiguration(jobConf);
+
+ //write the localized task jobconf
+ LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+ Path localTaskFile =
+ lDirAlloc.getLocalPathForWrite(TaskTracker.JOBFILE, jobConf);
+ JobLocalizer.writeLocalJobFile(localTaskFile, jobConf);
+ task.setJobFile(localTaskFile.toString());
+ task.setConf(jobConf);
+ }
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java Tue Mar 8 05:56:27 2011
@@ -19,20 +19,26 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
class CleanupQueue {
public static final Log LOG =
LogFactory.getLog(CleanupQueue.class);
- private static PathCleanupThread cleanupThread;
+ private static final PathCleanupThread cleanupThread =
+ new PathCleanupThread();
+ private static final CleanupQueue inst = new CleanupQueue();
+
+ public static CleanupQueue getInstance() { return inst; }
/**
* Create a singleton path-clean-up queue. It can be used to delete
@@ -42,59 +48,53 @@ class CleanupQueue {
* {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
* deletion.
*/
- public CleanupQueue() {
- synchronized (PathCleanupThread.class) {
- if (cleanupThread == null) {
- cleanupThread = new PathCleanupThread();
- }
- }
- }
+ protected CleanupQueue() { }
/**
* Contains info related to the path of the file/dir to be deleted
*/
static class PathDeletionContext {
- String fullPath;// full path of file or dir
- FileSystem fs;
+ final Path fullPath;// full path of file or dir
+ final Configuration conf;
- public PathDeletionContext(FileSystem fs, String fullPath) {
- this.fs = fs;
+ public PathDeletionContext(Path fullPath, Configuration conf) {
this.fullPath = fullPath;
+ this.conf = conf;
}
- protected String getPathForCleanup() {
+ protected Path getPathForCleanup() {
return fullPath;
}
/**
- * Makes the path(and its subdirectories recursively) fully deletable
+ * Deletes the path (and its subdirectories recursively)
+ * @throws IOException, InterruptedException
*/
- protected void enablePathForCleanup() throws IOException {
- // Do nothing by default.
- // Subclasses can override to provide enabling for deletion.
+ protected void deletePath() throws IOException, InterruptedException {
+ final Path p = getPathForCleanup();
+ UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException {
+ p.getFileSystem(conf).delete(p, true);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ final Path p = getPathForCleanup();
+ return (null == p) ? "undefined" : p.toString();
}
}
/**
* Adds the paths to the queue of paths to be deleted by cleanupThread.
*/
- void addToQueue(PathDeletionContext... contexts) {
+ public void addToQueue(PathDeletionContext... contexts) {
cleanupThread.addToQueue(contexts);
}
- protected static boolean deletePath(PathDeletionContext context)
- throws IOException {
- context.enablePathForCleanup();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to delete " + context.fullPath);
- }
- if (context.fs.exists(new Path(context.fullPath))) {
- return context.fs.delete(new Path(context.fullPath), true);
- }
- return true;
- }
-
// currently used by tests only
protected boolean isQueueEmpty() {
return (cleanupThread.queue.size() == 0);
@@ -128,18 +128,16 @@ class CleanupQueue {
while (true) {
try {
context = queue.take();
+ context.deletePath();
// delete the path.
- if (!deletePath(context)) {
- LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
- }
- else if (LOG.isDebugEnabled()) {
- LOG.debug("DELETED " + context.fullPath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DELETED " + context);
}
} catch (InterruptedException t) {
- LOG.warn("Interrupted deletion of " + context.fullPath);
+ LOG.warn("Interrupted deletion of " + context);
return;
- } catch (Exception e) {
- LOG.warn("Error deleting path " + context.fullPath + ": " + e);
+ } catch (Throwable e) {
+ LOG.warn("Error deleting path " + context, e);
}
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Tue Mar 8 05:56:27 2011
@@ -14,25 +14,27 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
/**
* The default implementation for controlling tasks.
@@ -43,187 +45,231 @@ import org.apache.hadoop.fs.Path;
*
* <br/>
*
+ * NOTE: This class is internal only class and not intended for users!!
*/
-@InterfaceAudience.Private
public class DefaultTaskController extends TaskController {
private static final Log LOG =
LogFactory.getLog(DefaultTaskController.class);
- /**
- * Launch a new JVM for the task.
- *
- * This method launches the new JVM for the task by executing the
- * the JVM command using the {@link Shell.ShellCommandExecutor}
- */
- void launchTaskJVM(TaskController.TaskControllerContext context)
- throws IOException {
- initializeTask(context);
-
- JvmEnv env = context.env;
- List<String> wrappedCommand =
- TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, true);
- ShellCommandExecutor shexec =
- new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
- env.workDir, env.env);
- // set the ShellCommandExecutor for later use.
- context.shExec = shexec;
- shexec.execute();
- }
-
- /**
- * Initialize the task environment.
- *
- * Since tasks are launched as the tasktracker user itself, this
- * method has no action to perform.
- */
- void initializeTask(TaskController.TaskControllerContext context) {
- // The default task controller does not need to set up
- // any permissions for proper execution.
- // So this is a dummy method.
- return;
- }
-
- /*
- * No need to do anything as we don't need to do as we dont need anything
- * extra from what TaskTracker has done.
- */
+ private FileSystem fs;
@Override
- void initializeJob(JobInitializationContext context) {
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ try {
+ fs = FileSystem.getLocal(conf).getRaw();
+ } catch (IOException ie) {
+ throw new RuntimeException("Failed getting LocalFileSystem", ie);
+ }
}
+ /**
+ * Create all of the directories for the task and launches the child jvm.
+ * @param user the user name
+ * @param attemptId the attempt id
+ * @throws IOException
+ */
@Override
- void terminateTask(TaskControllerContext context) {
- ShellCommandExecutor shexec = context.shExec;
- if (shexec != null) {
- Process process = shexec.getProcess();
- if (Shell.WINDOWS) {
- // Currently we don't use setsid on WINDOWS.
- //So kill the process alone.
- if (process != null) {
- process.destroy();
- }
- }
- else { // In addition to the task JVM, kill its subprocesses also.
- String pid = context.pid;
- if (pid != null) {
- if(ProcessTree.isSetsidAvailable) {
- ProcessTree.terminateProcessGroup(pid);
- }else {
- ProcessTree.terminateProcess(pid);
- }
- }
+ public int launchTask(String user,
+ String jobId,
+ String attemptId,
+ List<String> setup,
+ List<String> jvmArguments,
+ File currentWorkDirectory,
+ String stdout,
+ String stderr) throws IOException {
+
+ ShellCommandExecutor shExec = null;
+ try {
+ FileSystem localFs = FileSystem.getLocal(getConf());
+
+ //create the attempt dirs
+ new Localizer(localFs,
+ getConf().getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)).
+ initializeAttemptDirs(user, jobId, attemptId);
+
+ // create the working-directory of the task
+ if (!currentWorkDirectory.mkdir()) {
+ throw new IOException("Mkdirs failed to create "
+ + currentWorkDirectory.toString());
}
- }
- }
-
- @Override
- void killTask(TaskControllerContext context) {
- ShellCommandExecutor shexec = context.shExec;
- if (shexec != null) {
- if (Shell.WINDOWS) {
- //We don't do send kill process signal in case of windows as
- //already we have done a process.destroy() in terminateTaskJVM()
- return;
+
+ //mkdir the loglocation
+ String logLocation = TaskLog.getAttemptDir(jobId, attemptId).toString();
+ if (!localFs.mkdirs(new Path(logLocation))) {
+ throw new IOException("Mkdirs failed to create "
+ + logLocation);
}
- String pid = context.pid;
- if (pid != null) {
- if(ProcessTree.isSetsidAvailable) {
- ProcessTree.killProcessGroup(pid);
- } else {
- ProcessTree.killProcess(pid);
- }
+ //read the configuration for the job
+ FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+ long logSize = 0; //TODO: Ref BUG:2854624
+ // get the JVM command line.
+ String cmdLine =
+ TaskLog.buildCommandLine(setup, jvmArguments,
+ new File(stdout), new File(stderr), logSize, true);
+
+ // write the command to a file in the
+ // task specific cache directory
+ // TODO copy to user dir
+ Path p = new Path(allocator.getLocalPathForWrite(
+ TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+ getConf()), COMMAND_FILE);
+
+ String commandFile = writeCommand(cmdLine, rawFs, p);
+ rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
+ shExec = new ShellCommandExecutor(new String[]{
+ "bash", "-c", commandFile},
+ currentWorkDirectory);
+ shExec.execute();
+ } catch (Exception e) {
+ if (shExec == null) {
+ return -1;
}
+ int exitCode = shExec.getExitCode();
+ LOG.warn("Exit code from task is : " + exitCode);
+ LOG.info("Output from DefaultTaskController's launchTask follows:");
+ logOutput(shExec.getOutput());
+ return exitCode;
}
+ return 0;
}
-
+
+ /**
+ * This routine initializes the local file system for running a job.
+ * Details:
+ * <ul>
+ * <li>Copies the credentials file from the TaskTracker's private space to
+ * the job's private space </li>
+ * <li>Creates the job work directory and set
+ * {@link TaskTracker#JOB_LOCAL_DIR} in the configuration</li>
+ * <li>Downloads the job.jar, unjars it, and updates the configuration to
+ * reflect the localized path of the job.jar</li>
+ * <li>Creates a base JobConf in the job's private space</li>
+ * <li>Sets up the distributed cache</li>
+ * <li>Sets up the user logs directory for the job</li>
+ * </ul>
+ * This method must be invoked in the access control context of the job owner
+ * user. This is because the distributed cache is also setup here and the
+ * access to the hdfs files requires authentication tokens in case where
+ * security is enabled.
+ * @param user the user in question (the job owner)
+ * @param jobid the ID of the job in question
+ * @param credentials the path to the credentials file that the TaskTracker
+ * downloaded
+ * @param jobConf the path to the job configuration file that the TaskTracker
+ * downloaded
+ * @param taskTracker the connection to the task tracker
+ * @throws IOException
+ * @throws InterruptedException
+ */
@Override
- void dumpTaskStack(TaskControllerContext context) {
- ShellCommandExecutor shexec = context.shExec;
- if (shexec != null) {
- if (Shell.WINDOWS) {
- // We don't use signals in Windows.
- return;
- }
- String pid = context.pid;
- if (pid != null) {
- // Send SIGQUIT to get a stack dump
- if (ProcessTree.isSetsidAvailable) {
- ProcessTree.sigQuitProcessGroup(pid);
- } else {
- ProcessTree.sigQuitProcess(pid);
- }
- }
- }
+ public void initializeJob(String user, String jobid,
+ Path credentials, Path jobConf,
+ TaskUmbilicalProtocol taskTracker,
+ InetSocketAddress ttAddr
+ ) throws IOException, InterruptedException {
+ final LocalDirAllocator lDirAlloc = allocator;
+ FileSystem localFs = FileSystem.getLocal(getConf());
+ JobLocalizer localizer = new JobLocalizer((JobConf)getConf(), user, jobid);
+ localizer.createLocalDirs();
+ localizer.createUserDirs();
+ localizer.createJobDirs();
+
+ JobConf jConf = new JobConf(jobConf);
+ localizer.createWorkDir(jConf);
+ //copy the credential file
+ Path localJobTokenFile = lDirAlloc.getLocalPathForWrite(
+ TaskTracker.getLocalJobTokenFile(user, jobid), getConf());
+ FileUtil.copy(
+ localFs, credentials, localFs, localJobTokenFile, false, getConf());
+
+
+ //setup the user logs dir
+ localizer.initializeJobLogDir();
+
+ // Download the job.jar for this job from the system FS
+ // setup the distributed cache
+ // write job acls
+ // write localized config
+ localizer.localizeJobFiles(JobID.forName(jobid), jConf, localJobTokenFile,
+ taskTracker);
}
@Override
- public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+ public boolean signalTask(String user, int taskPid, Signal signal)
throws IOException {
- Path localizedUniqueDir = context.getLocalizedUniqueDir();
+ final int sigpid = TaskController.isSetsidAvailable
+ ? -1 * taskPid
+ : taskPid;
try {
- // Setting recursive execute permission on localized dir
- LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
- FileUtil.chmod(localizedUniqueDir.toString(), "+x", true);
- } catch (InterruptedException ie) {
- LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie);
- throw new IOException(ie);
+ sendSignal(sigpid, Signal.NULL);
+ } catch (ExitCodeException e) {
+ return false;
}
+ try {
+ sendSignal(sigpid, signal);
+ } catch (IOException e) {
+ try {
+ sendSignal(sigpid, Signal.NULL);
+ } catch (IOException ignore) {
+ return false;
+ }
+ throw e;
+ }
+ return true;
}
- @Override
- public void initializeUser(InitializationContext context) {
- // Do nothing.
- }
-
- @Override
- void runDebugScript(DebugScriptContext context) throws IOException {
- List<String> wrappedCommand = TaskLog.captureDebugOut(context.args,
- context.stdout);
- // run the script.
- ShellCommandExecutor shexec =
- new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), context.workDir);
+ /**
+ * Send a specified signal to the specified pid
+ *
+ * @param pid the pid of the process [group] to signal.
+ * @param signal signal to send
+ * (for logging).
+ */
+ protected void sendSignal(int pid, Signal signal) throws IOException {
+ ShellCommandExecutor shexec = null;
+ String[] arg = { "kill", "-" + signal.getValue(), Integer.toString(pid) };
+ shexec = new ShellCommandExecutor(arg);
shexec.execute();
- int exitCode = shexec.getExitCode();
- if (exitCode != 0) {
- throw new IOException("Task debug script exit with nonzero status of "
- + exitCode + ".");
- }
}
/**
- * Enables the task for cleanup by changing permissions of the specified path
- * in the local filesystem
+ * Delete the user's files under all of the task tracker root directories.
+ * @param user the user name
+ * @param subDir the path relative to base directories
+ * @param baseDirs the base directories (absolute paths)
+ * @throws IOException
*/
@Override
- void enableTaskForCleanup(PathDeletionContext context)
- throws IOException {
- enablePathForCleanup(context);
+ public void deleteAsUser(String user,
+ String subDir,
+ String... baseDirs) throws IOException {
+ if (baseDirs == null || baseDirs.length == 0) {
+ LOG.info("Deleting absolute path : " + subDir);
+ fs.delete(new Path(subDir), true);
+ return;
+ }
+ for (String baseDir : baseDirs) {
+ LOG.info("Deleting path : " + baseDir + Path.SEPARATOR + subDir);
+ fs.delete(new Path(baseDir + Path.SEPARATOR + subDir), true);
+ }
}
/**
- * Enables the job for cleanup by changing permissions of the specified path
- * in the local filesystem
+ * Delete the user's files under the userlogs directory.
+ * @param user the user to work as
+ * @param subDir the path under the userlogs directory.
+ * @throws IOException
*/
@Override
- void enableJobForCleanup(PathDeletionContext context)
- throws IOException {
- enablePathForCleanup(context);
+ public void deleteLogAsUser(String user,
+ String subDir) throws IOException {
+ Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
+ fs.delete(dir, true);
}
-
- /**
- * Enables the path for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
- private void enablePathForCleanup(PathDeletionContext context)
- throws IOException {
- try {
- FileUtil.chmod(context.fullPath, "u+rwx", true);
- } catch(InterruptedException e) {
- LOG.warn("Interrupted while setting permissions for " + context.fullPath +
- " for deletion.");
- } catch(IOException ioe) {
- LOG.warn("Unable to change permissions of " + context.fullPath);
- }
+
+ @Override
+ public void setup(LocalDirAllocator allocator) {
+ this.allocator = allocator;
}
+
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Mar 8 05:56:27 2011
@@ -121,6 +121,13 @@ public class IsolationRunner {
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
+
+ @Override
+ public void
+ updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes){
+ // NOTHING
+ }
}
private ClassLoader makeClassLoader(JobConf conf,
@@ -181,9 +188,15 @@ public class IsolationRunner {
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+ Path workDirName;
+ boolean workDirExists = lDirAlloc.ifExists(MRConstants.WORKDIR, conf);
+ if (workDirExists) {
+ workDirName = TaskRunner.formWorkDir(lDirAlloc, conf);
+ } else {
+ workDirName = lDirAlloc.getLocalPathForWrite(MRConstants.WORKDIR, conf);
+ }
- File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
- local.setWorkingDirectory(new Path(workDirName.toString()));
+ local.setWorkingDirectory(workDirName);
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
// set up a classloader with the right classpath
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Mar 8 05:56:27 2011
@@ -477,6 +477,11 @@ public class JobInProgress {
this.submitHostName = conf.getJobSubmitHostName();
this.submitHostAddress = conf.getJobSubmitHostAddress();
+ this.nonLocalMaps = new LinkedList<TaskInProgress>();
+ this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+ this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+ this.nonRunningReduces = new LinkedList<TaskInProgress>();
+ this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
@@ -3451,8 +3456,8 @@ public class JobInProgress {
}
Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
- new CleanupQueue().addToQueue(new PathDeletionContext(
- jobtracker.getFileSystem(), tempDir.toUri().getPath()));
+ CleanupQueue.getInstance().addToQueue(
+ new PathDeletionContext(tempDir, conf));
} catch (IOException e) {
LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java?rev=1079211&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java Tue Mar 8 05:56:27 2011
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Internal class responsible for initializing the job, not intended for users.
+ * Creates the following hierarchy:
+ * <li>$mapred.local.dir/taskTracker/$user</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/work</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars/job.jar</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/job.xml</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jobToken</li>
+ * <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+ */
+public class JobLocalizer {
+
+ static final Log LOG = LogFactory.getLog(JobLocalizer.class);
+
+ private static final FsPermission urwx =
+ FsPermission.createImmutable((short) 0700);
+ private static final FsPermission urwx_gx =
+ FsPermission.createImmutable((short) 0710);
+ private static final FsPermission urw_gr =
+ FsPermission.createImmutable((short) 0640);
+
+ private final String user;
+ private final String jobid;
+ private final FileSystem lfs;
+ private final List<Path> localDirs;
+ private final LocalDirAllocator lDirAlloc;
+ private final JobConf ttConf;
+
+ private final String JOBDIR;
+ private final String DISTDIR;
+ private final String WORKDIR;
+ private final String JARDST;
+ private final String JOBCONF;
+ private final String JOBTOKEN;
+ private static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
+
+ public JobLocalizer(JobConf ttConf, String user, String jobid)
+ throws IOException {
+ this(ttConf, user, jobid,
+ ttConf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ }
+
+ public JobLocalizer(JobConf ttConf, String user, String jobid,
+ String... localDirs) throws IOException {
+ if (null == user) {
+ throw new IOException("Cannot initialize for null user");
+ }
+ this.user = user;
+ if (null == jobid) {
+ throw new IOException("Cannot initialize for null jobid");
+ }
+ this.jobid = jobid;
+ this.ttConf = ttConf;
+ lfs = FileSystem.getLocal(ttConf).getRaw();
+ this.localDirs = createPaths(user, localDirs);
+ ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
+ Collections.shuffle(this.localDirs);
+ lDirAlloc = new LocalDirAllocator(JOB_LOCAL_CTXT);
+ JOBDIR = TaskTracker.JOBCACHE + Path.SEPARATOR + jobid;
+ DISTDIR = JOBDIR + "/" + TaskTracker.DISTCACHEDIR;
+ WORKDIR = JOBDIR + "/work";
+ JARDST = JOBDIR + "/" + TaskTracker.JARSDIR + "/job.jar";
+ JOBCONF = JOBDIR + "/" + TaskTracker.JOBFILE;
+ JOBTOKEN = JOBDIR + "/" + TaskTracker.JOB_TOKEN_FILE;
+ }
+
+ private static List<Path> createPaths(String user, final String[] str)
+ throws IOException {
+ if (null == str || 0 == str.length) {
+ throw new IOException("mapred.local.dir contains no entries");
+ }
+ final List<Path> ret = new ArrayList<Path>(str.length);
+ for (int i = 0; i < str.length; ++i) {
+ final Path p = new Path(str[i], TaskTracker.getUserDir(user));
+ ret.add(p);
+ str[i] = p.toString();
+ }
+ return ret;
+ }
+
+ public void createLocalDirs() throws IOException {
+ boolean userDirStatus = false;
+ // create all directories as rwx------
+ for (Path localDir : localDirs) {
+ // create $mapred.local.dir/taskTracker/$user
+ if (!lfs.mkdirs(localDir, urwx)) {
+ LOG.warn("Unable to create the user directory : " + localDir);
+ continue;
+ }
+ userDirStatus = true;
+ }
+ if (!userDirStatus) {
+ throw new IOException("Not able to initialize user directories "
+ + "in any of the configured local directories for user " + user);
+ }
+ }
+
+ /**
+ * Initialize the local directories for a particular user on this TT. This
+ * involves creation and setting permissions of the following directories
+ * <ul>
+ * <li>$mapred.local.dir/taskTracker/$user</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+ * <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+ * </ul>
+ */
+ public void createUserDirs() throws IOException {
+ LOG.info("Initializing user " + user + " on this TT.");
+
+ boolean jobCacheDirStatus = false;
+ boolean distributedCacheDirStatus = false;
+
+ // create all directories as rwx------
+ for (Path localDir : localDirs) {
+ // create $mapred.local.dir/taskTracker/$user/jobcache
+ final Path jobDir =
+ new Path(localDir, TaskTracker.JOBCACHE);
+ if (!lfs.mkdirs(jobDir, urwx)) {
+ LOG.warn("Unable to create job cache directory : " + jobDir);
+ } else {
+ jobCacheDirStatus = true;
+ }
+ // create $mapred.local.dir/taskTracker/$user/distcache
+ final Path distDir =
+ new Path(localDir, TaskTracker.DISTCACHEDIR);
+ if (!lfs.mkdirs(distDir, urwx)) {
+ LOG.warn("Unable to create distributed-cache directory : " + distDir);
+ } else {
+ distributedCacheDirStatus = true;
+ }
+ }
+ if (!jobCacheDirStatus) {
+ throw new IOException("Not able to initialize job-cache directories "
+ + "in any of the configured local directories for user " + user);
+ }
+ if (!distributedCacheDirStatus) {
+ throw new IOException(
+ "Not able to initialize distributed-cache directories "
+ + "in any of the configured local directories for user "
+ + user);
+ }
+ }
+
+ /**
+ * Prepare the job directories for a given job. To be called by the job
+ * localization code, only if the job is not already localized.
+ * <br>
+ * Here, we set 700 permissions on the job directories created on all disks.
+ * This we do so as to avoid any misuse by other users till the time
+ * {@link TaskController#initializeJob} is run at a
+ * later time to set proper private permissions on the job directories. <br>
+ */
+ public void createJobDirs() throws IOException {
+ boolean initJobDirStatus = false;
+ for (Path localDir : localDirs) {
+ Path fullJobDir = new Path(localDir, JOBDIR);
+ if (lfs.exists(fullJobDir)) {
+ // this will happen on a partial execution of localizeJob. Sometimes
+ // copying job.xml to the local disk succeeds but copying job.jar might
+ // throw out an exception. We should clean up and then try again.
+ lfs.delete(fullJobDir, true);
+ }
+ // create $mapred.local.dir/taskTracker/$user/jobcache/$jobid
+ if (!lfs.mkdirs(fullJobDir, urwx)) {
+ LOG.warn("Not able to create job directory " + fullJobDir.toString());
+ } else {
+ initJobDirStatus = true;
+ }
+ }
+ if (!initJobDirStatus) {
+ throw new IOException("Not able to initialize job directories "
+ + "in any of the configured local directories for job "
+ + jobid.toString());
+ }
+ }
+
+ /**
+ * Create job log directory and set appropriate permissions for the directory.
+ */
+ public void initializeJobLogDir() throws IOException {
+ Path jobUserLogDir = new Path(TaskLog.getJobDir(jobid).toURI().toString());
+ if (!lfs.mkdirs(jobUserLogDir, urwx_gx)) {
+ throw new IOException(
+ "Could not create job user log directory: " + jobUserLogDir);
+ }
+ }
+
+ /**
+ * Download the job jar file from FS to the local file system and unjar it.
+ * Set the local jar file in the passed configuration.
+ *
+ * @param localJobConf
+ * @throws IOException
+ */
+ private void localizeJobJarFile(JobConf localJobConf)
+ throws IOException, InterruptedException {
+ // copy Jar file to the local FS and unjar it.
+ String jarFile = localJobConf.getJar();
+ FileStatus status = null;
+ long jarFileSize = -1;
+ if (jarFile != null) {
+ Path jarFilePath = new Path(jarFile);
+ FileSystem userFs = jarFilePath.getFileSystem(localJobConf);
+ try {
+ status = userFs.getFileStatus(jarFilePath);
+ jarFileSize = status.getLen();
+ } catch (FileNotFoundException fe) {
+ jarFileSize = -1;
+ }
+ // Here we check for five times the size of jarFileSize to accommodate for
+ // unjarring the jar file in the jars directory
+ Path localJarFile =
+ lDirAlloc.getLocalPathForWrite(JARDST, 5 * jarFileSize, ttConf);
+
+ //Download job.jar
+ userFs.copyToLocalFile(jarFilePath, localJarFile);
+ localJobConf.setJar(localJarFile.toString());
+ // Also un-jar the job.jar files. We un-jar it so that classes inside
+ // sub-directories, for e.g., lib/, classes/ are available on class-path
+ RunJar.unJar(new File(localJarFile.toString()),
+ new File(localJarFile.getParent().toString()));
+ FileUtil.chmod(localJarFile.getParent().toString(), "ugo+rx", true);
+ }
+ }
+
+ /**
+ * The permissions to use for the private distributed cache objects.
+ * It is already protected by the user directory, so keep the group and other
+ * the same so that LocalFileSystem will use the java File methods to
+ * set permission.
+ */
+ private static final FsPermission privateCachePerms =
+ FsPermission.createImmutable((short) 0755);
+
+ /**
+ * Given a list of objects, download each one.
+ * @param conf the job's configuration
+ * @param sources the list of objects to download from
+ * @param dests the list of paths to download them to
+ * @param times the desired modification times
+ * @param isPublic are the objects in the public cache?
+ * @param isArchive are these archive files?
+ * @throws IOException
+ * @return for archives, return the list of each of the sizes.
+ */
+ private static long[] downloadPrivateCacheObjects(Configuration conf,
+ URI[] sources,
+ Path[] dests,
+ long[] times,
+ boolean[] isPublic,
+ boolean isArchive
+ ) throws IOException,
+ InterruptedException {
+ if (null == sources && null == dests && null == times && null == isPublic) {
+ return null;
+ }
+ if (sources.length != dests.length ||
+ sources.length != times.length ||
+ sources.length != isPublic.length) {
+ throw new IOException("Distributed cache entry arrays have different " +
+ "lengths: " + sources.length + ", " + dests.length +
+ ", " + times.length + ", " + isPublic.length);
+ }
+ long[] result = new long[sources.length];
+ for(int i=0; i < sources.length; i++) {
+ // public objects are already downloaded by the Task Tracker, we
+ // only need to handle the private ones here
+ if (!isPublic[i]) {
+ result[i] =
+ TrackerDistributedCacheManager.downloadCacheObject(conf, sources[i],
+ dests[i],
+ times[i],
+ isArchive,
+ privateCachePerms);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Download the parts of the distributed cache that are private.
+ * @param conf the job's configuration
+ * @throws IOException
+ * @return the size of the archive objects
+ */
+ public static long[] downloadPrivateCache(Configuration conf)
+ throws InterruptedException, IOException {
+ downloadPrivateCacheObjects(conf,
+ DistributedCache.getCacheFiles(conf),
+ DistributedCache.getLocalCacheFiles(conf),
+ DistributedCache.getFileTimestamps(conf),
+ TrackerDistributedCacheManager.
+ getFileVisibilities(conf),
+ false);
+ return
+ downloadPrivateCacheObjects(conf,
+ DistributedCache.getCacheArchives(conf),
+ DistributedCache.getLocalCacheArchives(conf),
+ DistributedCache.getArchiveTimestamps(conf),
+ TrackerDistributedCacheManager.
+ getArchiveVisibilities(conf),
+ true);
+ }
+
+ public void localizeJobFiles(JobID jobid, JobConf jConf,
+ Path localJobTokenFile, TaskUmbilicalProtocol taskTracker)
+ throws IOException, InterruptedException {
+ localizeJobFiles(jobid, jConf,
+ lDirAlloc.getLocalPathForWrite(JOBCONF, ttConf), localJobTokenFile,
+ taskTracker);
+ }
+
+ public void localizeJobFiles(final JobID jobid, JobConf jConf,
+ Path localJobFile, Path localJobTokenFile,
+ final TaskUmbilicalProtocol taskTracker)
+ throws IOException, InterruptedException {
+ // Download the job.jar for this job from the system FS
+ localizeJobJarFile(jConf);
+
+ jConf.set(JOB_LOCAL_CTXT, ttConf.get(JOB_LOCAL_CTXT));
+
+ //update the config some more
+ jConf.set(TokenCache.JOB_TOKENS_FILENAME, localJobTokenFile.toString());
+ jConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+ ttConf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ TaskTracker.resetNumTasksPerJvm(jConf);
+
+ //setup the distributed cache
+ final long[] sizes = downloadPrivateCache(jConf);
+ if (sizes != null) {
+ //the following doAs is required because the DefaultTaskController
+ //calls the localizeJobFiles method in the context of the TaskTracker
+ //process. The JVM authorization check would fail without this
+ //doAs. In the LinuxTC case, this doesn't harm.
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(jobid.toString());
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException {
+ taskTracker.updatePrivateDistributedCacheSizes(jobid, sizes);
+ return null;
+ }
+ });
+ }
+
+ // Create job-acls.xml file in job userlog dir and write the needed
+ // info for authorization of users for viewing task logs of this job.
+ writeJobACLs(jConf, new Path(TaskLog.getJobDir(jobid).toURI().toString()));
+
+ //write the updated jobConf file in the job directory
+ JobLocalizer.writeLocalJobFile(localJobFile, jConf);
+ }
+
+ /**
+ * Creates job-acls.xml under the given directory logDir and writes
+ * job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+ * file.
+ * queue name is the queue to which the job was submitted to.
+ * queue-admins-acl is the queue admins ACL of the queue to which this
+ * job was submitted to.
+ * @param conf job configuration
+ * @param logDir job userlog dir
+ * @throws IOException
+ */
+ private void writeJobACLs(JobConf conf, Path logDir) throws IOException {
+ JobConf aclConf = new JobConf(false);
+
+ // set the job view acl in aclConf
+ String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
+ aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
+
+ // set the job queue name in aclConf
+ String queue = conf.getQueueName();
+ aclConf.setQueueName(queue);
+
+ // set the queue admins acl in aclConf
+ String qACLName = QueueManager.toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName());
+ String queueAdminsACL = conf.get(qACLName, " ");
+ aclConf.set(qACLName, queueAdminsACL);
+
+ // set jobOwner as user.name in aclConf
+ aclConf.set("user.name", user);
+
+ OutputStream out = null;
+ Path aclFile = new Path(logDir, TaskTracker.jobACLsFile);
+ try {
+ out = lfs.create(aclFile);
+ aclConf.writeXml(out);
+ } finally {
+ IOUtils.cleanup(LOG, out);
+ }
+ lfs.setPermission(aclFile, urw_gr);
+ }
+
+ public void createWorkDir(JobConf jConf) throws IOException {
+ // create $mapred.local.dir/taskTracker/$user/jobcache/$jobid/work
+ final Path workDir = lDirAlloc.getLocalPathForWrite(WORKDIR, ttConf);
+ if (!lfs.mkdirs(workDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ jConf.set(TaskTracker.JOB_LOCAL_DIR, workDir.toUri().getPath());
+ }
+
+ public Path findCredentials() throws IOException {
+ return lDirAlloc.getLocalPathToRead(JOBTOKEN, ttConf);
+ }
+
+ public int runSetup(String user, String jobid, Path localJobTokenFile,
+ TaskUmbilicalProtocol taskTracker)
+ throws IOException, InterruptedException {
+ // load user credentials, configuration
+ // ASSUME
+ // let $x = $mapred.local.dir
+ // forall $x, exists $x/$user
+ // exists $x/$user/jobcache/$jobid/job.xml
+ // exists $x/$user/jobcache/$jobid/jobToken
+ // exists $logdir/userlogs/$jobid
+ final Path localJobFile = lDirAlloc.getLocalPathToRead(JOBCONF, ttConf);
+ final JobConf cfgJob = new JobConf(localJobFile);
+ createWorkDir(cfgJob);
+ localizeJobFiles(JobID.forName(jobid), cfgJob, localJobFile,
+ localJobTokenFile, taskTracker);
+
+ // $mapred.local.dir/taskTracker/$user/distcache
+ return 0;
+ }
+
+ public static void main(String[] argv)
+ throws IOException, InterruptedException {
+ // $logdir
+ // let $x = $root/tasktracker for some $mapred.local.dir
+ // create $x/$user/jobcache/$jobid/work
+ // fetch $x/$user/jobcache/$jobid/jars/job.jar
+ // setup $x/$user/distcache
+ // verify $x/distcache
+ // write $x/$user/jobcache/$jobid/job.xml
+ final String user = argv[0];
+ final String jobid = argv[1];
+ final InetSocketAddress ttAddr =
+ new InetSocketAddress(argv[2], Integer.parseInt(argv[3]));
+ final String uid = UserGroupInformation.getCurrentUser().getShortUserName();
+ if (!user.equals(uid)) {
+ LOG.warn("Localization running as " + uid + " not " + user);
+ }
+
+ // Pull in user's tokens to complete setup
+ final JobConf conf = new JobConf();
+ final JobLocalizer localizer =
+ new JobLocalizer(conf, user, jobid);
+ final Path jobTokenFile = localizer.findCredentials();
+ final Credentials creds = TokenCache.loadTokens(
+ jobTokenFile.toUri().toString(), conf);
+ LOG.debug("Loaded tokens from " + jobTokenFile);
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+ for (Token<? extends TokenIdentifier> token : creds.getAllTokens()) {
+ ugi.addToken(token);
+ }
+
+ UserGroupInformation ugiJob = UserGroupInformation.createRemoteUser(jobid);
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(creds);
+ jt.setService(new Text(ttAddr.getAddress().getHostAddress() + ":"
+ + ttAddr.getPort()));
+ ugiJob.addToken(jt);
+
+ final TaskUmbilicalProtocol taskTracker =
+ ugiJob.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+ public TaskUmbilicalProtocol run() throws IOException {
+ TaskUmbilicalProtocol taskTracker =
+ (TaskUmbilicalProtocol) RPC.getProxy(TaskUmbilicalProtocol.class,
+ TaskUmbilicalProtocol.versionID,
+ ttAddr, conf);
+ return taskTracker;
+ }
+ });
+ System.exit(
+ ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+ public Integer run() {
+ try {
+ return localizer.runSetup(user, jobid, jobTokenFile, taskTracker);
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ return -1;
+ }
+ }
+ }));
+ }
+
+ /**
+ * Write the task specific job-configuration file.
+ * @throws IOException
+ */
+ public static void writeLocalJobFile(Path jobFile, JobConf conf)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(jobFile);
+ OutputStream out = null;
+ try {
+ out = FileSystem.create(localFs, jobFile, urw_gr);
+ conf.writeXml(out);
+ } finally {
+ IOUtils.cleanup(LOG, out);
+ }
+ }
+
+}