You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC

svn commit: r1079211 [7/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/fairscheduler/designdoc/ src/contrib/streaming/s...

Added: hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/c%2B%2B/task-controller/missing?rev=1079211&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing Tue Mar  8 05:56:27 2011
@@ -0,0 +1,360 @@
+#! /bin/sh
+# Common stub for a few missing GNU programs while installing.
+
+scriptversion=2005-06-08.21
+
+# Copyright (C) 1996, 1997, 1999, 2000, 2002, 2003, 2004, 2005
+#   Free Software Foundation, Inc.
+# Originally by Fran,cois Pinard <pi...@iro.umontreal.ca>, 1996.
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+# As a special exception to the GNU General Public License, if you
+# distribute this file as part of a program that contains a
+# configuration script generated by Autoconf, you may include it under
+# the same distribution terms that you use for the rest of that program.
+
+if test $# -eq 0; then
+  echo 1>&2 "Try \`$0 --help' for more information"
+  exit 1
+fi
+
+run=:
+
+# In the cases where this matters, `missing' is being run in the
+# srcdir already.
+if test -f configure.ac; then
+  configure_ac=configure.ac
+else
+  configure_ac=configure.in
+fi
+
+msg="missing on your system"
+
+case "$1" in
+--run)
+  # Try to run requested program, and just exit if it succeeds.
+  run=
+  shift
+  "$@" && exit 0
+  # Exit code 63 means version mismatch.  This often happens
+  # when the user try to use an ancient version of a tool on
+  # a file that requires a minimum version.  In this case we
+  # we should proceed has if the program had been absent, or
+  # if --run hadn't been passed.
+  if test $? = 63; then
+    run=:
+    msg="probably too old"
+  fi
+  ;;
+
+  -h|--h|--he|--hel|--help)
+    echo "\
+$0 [OPTION]... PROGRAM [ARGUMENT]...
+
+Handle \`PROGRAM [ARGUMENT]...' for when PROGRAM is missing, or return an
+error status if there is no known handling for PROGRAM.
+
+Options:
+  -h, --help      display this help and exit
+  -v, --version   output version information and exit
+  --run           try to run the given command, and emulate it if it fails
+
+Supported PROGRAM values:
+  aclocal      touch file \`aclocal.m4'
+  autoconf     touch file \`configure'
+  autoheader   touch file \`config.h.in'
+  automake     touch all \`Makefile.in' files
+  bison        create \`y.tab.[ch]', if possible, from existing .[ch]
+  flex         create \`lex.yy.c', if possible, from existing .c
+  help2man     touch the output file
+  lex          create \`lex.yy.c', if possible, from existing .c
+  makeinfo     touch the output file
+  tar          try tar, gnutar, gtar, then tar without non-portable flags
+  yacc         create \`y.tab.[ch]', if possible, from existing .[ch]
+
+Send bug reports to <bu...@gnu.org>."
+    exit $?
+    ;;
+
+  -v|--v|--ve|--ver|--vers|--versi|--versio|--version)
+    echo "missing $scriptversion (GNU Automake)"
+    exit $?
+    ;;
+
+  -*)
+    echo 1>&2 "$0: Unknown \`$1' option"
+    echo 1>&2 "Try \`$0 --help' for more information"
+    exit 1
+    ;;
+
+esac
+
+# Now exit if we have it, but it failed.  Also exit now if we
+# don't have it and --version was passed (most likely to detect
+# the program).
+case "$1" in
+  lex|yacc)
+    # Not GNU programs, they don't have --version.
+    ;;
+
+  tar)
+    if test -n "$run"; then
+       echo 1>&2 "ERROR: \`tar' requires --run"
+       exit 1
+    elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+       exit 1
+    fi
+    ;;
+
+  *)
+    if test -z "$run" && ($1 --version) > /dev/null 2>&1; then
+       # We have it, but it failed.
+       exit 1
+    elif test "x$2" = "x--version" || test "x$2" = "x--help"; then
+       # Could not run --version or --help.  This is probably someone
+       # running `$TOOL --version' or `$TOOL --help' to check whether
+       # $TOOL exists and not knowing $TOOL uses missing.
+       exit 1
+    fi
+    ;;
+esac
+
+# If it does not exist, or fails to run (possibly an outdated version),
+# try to emulate it.
+case "$1" in
+  aclocal*)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`acinclude.m4' or \`${configure_ac}'.  You might want
+         to install the \`Automake' and \`Perl' packages.  Grab them from
+         any GNU archive site."
+    touch aclocal.m4
+    ;;
+
+  autoconf)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`${configure_ac}'.  You might want to install the
+         \`Autoconf' and \`GNU m4' packages.  Grab them from any GNU
+         archive site."
+    touch configure
+    ;;
+
+  autoheader)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`acconfig.h' or \`${configure_ac}'.  You might want
+         to install the \`Autoconf' and \`GNU m4' packages.  Grab them
+         from any GNU archive site."
+    files=`sed -n 's/^[ ]*A[CM]_CONFIG_HEADER(\([^)]*\)).*/\1/p' ${configure_ac}`
+    test -z "$files" && files="config.h"
+    touch_files=
+    for f in $files; do
+      case "$f" in
+      *:*) touch_files="$touch_files "`echo "$f" |
+				       sed -e 's/^[^:]*://' -e 's/:.*//'`;;
+      *) touch_files="$touch_files $f.in";;
+      esac
+    done
+    touch $touch_files
+    ;;
+
+  automake*)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified \`Makefile.am', \`acinclude.m4' or \`${configure_ac}'.
+         You might want to install the \`Automake' and \`Perl' packages.
+         Grab them from any GNU archive site."
+    find . -type f -name Makefile.am -print |
+	   sed 's/\.am$/.in/' |
+	   while read f; do touch "$f"; done
+    ;;
+
+  autom4te)
+    echo 1>&2 "\
+WARNING: \`$1' is needed, but is $msg.
+         You might have modified some files without having the
+         proper tools for further handling them.
+         You can get \`$1' as part of \`Autoconf' from any GNU
+         archive site."
+
+    file=`echo "$*" | sed -n 's/.*--output[ =]*\([^ ]*\).*/\1/p'`
+    test -z "$file" && file=`echo "$*" | sed -n 's/.*-o[ ]*\([^ ]*\).*/\1/p'`
+    if test -f "$file"; then
+	touch $file
+    else
+	test -z "$file" || exec >$file
+	echo "#! /bin/sh"
+	echo "# Created by GNU Automake missing as a replacement of"
+	echo "#  $ $@"
+	echo "exit 0"
+	chmod +x $file
+	exit 1
+    fi
+    ;;
+
+  bison|yacc)
+    echo 1>&2 "\
+WARNING: \`$1' $msg.  You should only need it if
+         you modified a \`.y' file.  You may need the \`Bison' package
+         in order for those modifications to take effect.  You can get
+         \`Bison' from any GNU archive site."
+    rm -f y.tab.c y.tab.h
+    if [ $# -ne 1 ]; then
+        eval LASTARG="\${$#}"
+	case "$LASTARG" in
+	*.y)
+	    SRCFILE=`echo "$LASTARG" | sed 's/y$/c/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" y.tab.c
+	    fi
+	    SRCFILE=`echo "$LASTARG" | sed 's/y$/h/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" y.tab.h
+	    fi
+	  ;;
+	esac
+    fi
+    if [ ! -f y.tab.h ]; then
+	echo >y.tab.h
+    fi
+    if [ ! -f y.tab.c ]; then
+	echo 'main() { return 0; }' >y.tab.c
+    fi
+    ;;
+
+  lex|flex)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified a \`.l' file.  You may need the \`Flex' package
+         in order for those modifications to take effect.  You can get
+         \`Flex' from any GNU archive site."
+    rm -f lex.yy.c
+    if [ $# -ne 1 ]; then
+        eval LASTARG="\${$#}"
+	case "$LASTARG" in
+	*.l)
+	    SRCFILE=`echo "$LASTARG" | sed 's/l$/c/'`
+	    if [ -f "$SRCFILE" ]; then
+	         cp "$SRCFILE" lex.yy.c
+	    fi
+	  ;;
+	esac
+    fi
+    if [ ! -f lex.yy.c ]; then
+	echo 'main() { return 0; }' >lex.yy.c
+    fi
+    ;;
+
+  help2man)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+	 you modified a dependency of a manual page.  You may need the
+	 \`Help2man' package in order for those modifications to take
+	 effect.  You can get \`Help2man' from any GNU archive site."
+
+    file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+    if test -z "$file"; then
+	file=`echo "$*" | sed -n 's/.*--output=\([^ ]*\).*/\1/p'`
+    fi
+    if [ -f "$file" ]; then
+	touch $file
+    else
+	test -z "$file" || exec >$file
+	echo ".ab help2man is required to generate this page"
+	exit 1
+    fi
+    ;;
+
+  makeinfo)
+    echo 1>&2 "\
+WARNING: \`$1' is $msg.  You should only need it if
+         you modified a \`.texi' or \`.texinfo' file, or any other file
+         indirectly affecting the aspect of the manual.  The spurious
+         call might also be the consequence of using a buggy \`make' (AIX,
+         DU, IRIX).  You might want to install the \`Texinfo' package or
+         the \`GNU make' package.  Grab either from any GNU archive site."
+    # The file to touch is that specified with -o ...
+    file=`echo "$*" | sed -n 's/.*-o \([^ ]*\).*/\1/p'`
+    if test -z "$file"; then
+      # ... or it is the one specified with @setfilename ...
+      infile=`echo "$*" | sed 's/.* \([^ ]*\) *$/\1/'`
+      file=`sed -n '/^@setfilename/ { s/.* \([^ ]*\) *$/\1/; p; q; }' $infile`
+      # ... or it is derived from the source name (dir/f.texi becomes f.info)
+      test -z "$file" && file=`echo "$infile" | sed 's,.*/,,;s,.[^.]*$,,'`.info
+    fi
+    # If the file does not exist, the user really needs makeinfo;
+    # let's fail without touching anything.
+    test -f $file || exit 1
+    touch $file
+    ;;
+
+  tar)
+    shift
+
+    # We have already tried tar in the generic part.
+    # Look for gnutar/gtar before invocation to avoid ugly error
+    # messages.
+    if (gnutar --version > /dev/null 2>&1); then
+       gnutar "$@" && exit 0
+    fi
+    if (gtar --version > /dev/null 2>&1); then
+       gtar "$@" && exit 0
+    fi
+    firstarg="$1"
+    if shift; then
+	case "$firstarg" in
+	*o*)
+	    firstarg=`echo "$firstarg" | sed s/o//`
+	    tar "$firstarg" "$@" && exit 0
+	    ;;
+	esac
+	case "$firstarg" in
+	*h*)
+	    firstarg=`echo "$firstarg" | sed s/h//`
+	    tar "$firstarg" "$@" && exit 0
+	    ;;
+	esac
+    fi
+
+    echo 1>&2 "\
+WARNING: I can't seem to be able to run \`tar' with the given arguments.
+         You may want to install GNU tar or Free paxutils, or check the
+         command line arguments."
+    exit 1
+    ;;
+
+  *)
+    echo 1>&2 "\
+WARNING: \`$1' is needed, and is $msg.
+         You might have modified some files without having the
+         proper tools for further handling them.  Check the \`README' file,
+         it often tells you about the needed prerequisites for installing
+         this package.  You may also peek at any GNU archive site, in case
+         some other package would contain this missing \`$1' program."
+    exit 1
+    ;;
+esac
+
+exit 0
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-end: "$"
+# End:

Propchange: hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/missing
------------------------------------------------------------------------------
    svn:executable = *

Added: hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1079211&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/test/test-task-controller.c (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/c++/task-controller/test/test-task-controller.c Tue Mar  8 05:56:27 2011
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "configuration.h"
+#include "task-controller.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+#define TEST_ROOT "/tmp/test-task-controller"
+#define DONT_TOUCH_FILE "dont-touch-me"
+
+static char* username = NULL;
+
+/**
+ * Run the command using the effective user id.
+ * It can't use system, since bash seems to copy the real user id into the
+ * effective id.
+ */
+void run(const char *cmd) {
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork - %s\n", strerror(errno));
+  } else if (child == 0) {
+    char *cmd_copy = strdup(cmd);
+    char *ptr;
+    int words = 1;
+    for(ptr = strchr(cmd_copy, ' ');  ptr; ptr = strchr(ptr+1, ' ')) {
+      words += 1;
+    }
+    char **argv = malloc(sizeof(char *) * (words + 1));
+    ptr = strtok(cmd_copy, " ");
+    int i = 0;
+    argv[i++] = ptr;
+    while (ptr != NULL) {
+      ptr = strtok(NULL, " ");
+      argv[i++] = ptr;
+    }
+    if (execvp(argv[0], argv) != 0) {
+      printf("FAIL: exec failed in child %s - %s\n", cmd, strerror(errno));
+      exit(42);
+    }
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) <= 0) {
+      printf("FAIL: failed waiting for child process %s pid %d - %s\n", 
+	     cmd, child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: process %s pid %d did not exit\n", cmd, child);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: process %s pid %d exited with error status %d\n", cmd, 
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+int write_config_file(char *file_name) {
+  FILE *file;
+  file = fopen(file_name, "w");
+  if (file == NULL) {
+    printf("Failed to open %s.\n", file_name);
+    return EXIT_FAILURE;
+  }
+  fprintf(file, "mapred.local.dir=" TEST_ROOT "/local-1");
+  int i;
+  for(i=2; i < 5; ++i) {
+    fprintf(file, "," TEST_ROOT "/local-%d", i);
+  }
+  fprintf(file, "\n");
+  fprintf(file, "hadoop.log.dir=" TEST_ROOT "/logs\n");
+  fclose(file);
+  return 0;
+}
+
+void create_tt_roots() {
+  char** tt_roots = get_values("mapred.local.dir");
+  char** tt_root;
+  for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) {
+    if (mkdir(*tt_root, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", *tt_root,
+	     strerror(errno));
+      exit(1);
+    }
+    char buffer[100000];
+    sprintf(buffer, "%s/taskTracker", *tt_root);
+    if (mkdir(buffer, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", buffer,
+	     strerror(errno));
+      exit(1);
+    }
+  }
+  free_values(tt_roots);
+}
+
+void test_get_user_directory() {
+  char *user_dir = get_user_directory("/tmp", "user");
+  char *expected = "/tmp/taskTracker/user";
+  if (strcmp(user_dir, expected) != 0) {
+    printf("test_get_user_directory expected %s got %s\n", user_dir, expected);
+    exit(1);
+  }
+  free(user_dir);
+}
+
+void test_get_job_directory() {
+  char *expected = "/tmp/taskTracker/user/jobcache/job_200906101234_0001";
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  if (strcmp(job_dir, expected) != 0) {
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_get_attempt_directory() {
+  char *attempt_dir = get_attempt_work_directory("/tmp", "owen", "job_1",
+						 "attempt_1");
+  char *expected = "/tmp/taskTracker/owen/jobcache/job_1/attempt_1/work";
+  if (strcmp(attempt_dir, expected) != 0) {
+    printf("Fail get_attempt_work_directory got %s expected %s\n",
+	   attempt_dir, expected);
+  }
+  free(attempt_dir);
+}
+
+void test_get_task_launcher_file() {
+  char *expected_file = ("/tmp/taskTracker/user/jobcache/job_200906101234_0001"
+			 "/taskjvm.sh");
+  char *job_dir = get_job_directory("/tmp", "user",
+                                    "job_200906101234_0001");
+  char *task_file =  get_task_launcher_file(job_dir);
+  if (strcmp(task_file, expected_file) != 0) {
+    printf("failure to match expected task file %s vs %s\n", task_file,
+           expected_file);
+    exit(1);
+  }
+  free(job_dir);
+  free(task_file);
+}
+
+void test_get_job_log_dir() {
+  char *expected = TEST_ROOT "/logs/userlogs/job_200906101234_0001";
+  char *logdir = get_job_log_directory("job_200906101234_0001");
+  if (strcmp(logdir, expected) != 0) {
+    printf("Fail get_job_log_dir got %s expected %s\n", logdir, expected);
+    exit(1);
+  }
+  free(logdir);
+}
+
+void test_get_task_log_dir() {
+  char *logdir = get_job_log_directory("job_5/task_4");
+  char *expected = TEST_ROOT "/logs/userlogs/job_5/task_4";
+  if (strcmp(logdir, expected) != 0) {
+    printf("FAIL: get_task_log_dir expected %s got %s\n", logdir, expected);
+  }
+  free(logdir);
+}
+
+void test_check_user() {
+  printf("\nTesting test_check_user\n");
+  struct passwd *user = check_user(username);
+  if (user == NULL) {
+    printf("FAIL: failed check for user %s\n", username);
+    exit(1);
+  }
+  free(user);
+  if (check_user("lp") != NULL) {
+    printf("FAIL: failed check for system user lp\n");
+    exit(1);
+  }
+  if (check_user("root") != NULL) {
+    printf("FAIL: failed check for system user root\n");
+    exit(1);
+  }
+  if (check_user("mapred") != NULL) {
+    printf("FAIL: failed check for hadoop user mapred\n");
+    exit(1);
+  }
+}
+
+void test_check_configuration_permissions() {
+  printf("\nTesting check_configuration_permissions\n");
+  if (check_configuration_permissions("/etc/passwd") != 0) {
+    printf("FAIL: failed permission check on /etc/passwd\n");
+    exit(1);
+  }
+  if (check_configuration_permissions(TEST_ROOT) == 0) {
+    printf("FAIL: failed permission check on %s\n", TEST_ROOT);
+    exit(1);
+  }
+}
+
+void test_delete_task() {
+  if (initialize_user(username)) {
+    printf("FAIL: failed to initialized user %s\n", username);
+    exit(1);
+  }
+  char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
+  char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, 
+                                       DONT_TOUCH_FILE);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", 
+					      username, "job_1", "task_1");
+  char buffer[100000];
+  sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+  run(buffer);
+  sprintf(buffer, "touch %s", dont_touch);
+  run(buffer);
+
+  // soft link to the canary file from the task directory
+  sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+  run(buffer);
+  // hard link to the canary file from the task directory
+  sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+  run(buffer);
+  // create a dot file in the task directory
+  sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+  run(buffer);
+  // create a no permission file
+  sprintf(buffer, "touch %s/who/let/protect", task_dir);
+  run(buffer);
+  sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+  run(buffer);
+  // create a no permission directory
+  sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+  run(buffer);
+
+  // delete task directory
+  int ret = delete_as_user(username, "jobcache/job_1/task_1");
+  if (ret != 0) {
+    printf("FAIL: return code from delete_as_user is %d\n", ret);
+    exit(1);
+  }
+
+  // check to make sure the task directory is gone
+  if (access(task_dir, R_OK) == 0) {
+    printf("FAIL: failed to delete the directory - %s\n", task_dir);
+    exit(1);
+  }
+  // check to make sure the job directory is not gone
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: accidently deleted the directory - %s\n", job_dir);
+    exit(1);
+  }
+  // but that the canary is not gone
+  if (access(dont_touch, R_OK) != 0) {
+    printf("FAIL: accidently deleted file %s\n", dont_touch);
+    exit(1);
+  }
+  sprintf(buffer, "chmod -R 700 %s", job_dir);
+  run(buffer);
+  sprintf(buffer, "rm -fr %s", job_dir);
+  run(buffer);
+  free(job_dir);
+  free(task_dir);
+  free(dont_touch);
+}
+
+void test_delete_job() {
+  char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2");
+  char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, 
+                                       DONT_TOUCH_FILE);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", 
+					      username, "job_2", "task_1");
+  char buffer[100000];
+  sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir);
+  run(buffer);
+  sprintf(buffer, "touch %s", dont_touch);
+  run(buffer);
+
+  // soft link to the canary file from the task directory
+  sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir);
+  run(buffer);
+  // hard link to the canary file from the task directory
+  sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir);
+  run(buffer);
+  // create a dot file in the task directory
+  sprintf(buffer, "touch %s/who/let/.dotfile", task_dir);
+  run(buffer);
+  // create a no permission file
+  sprintf(buffer, "touch %s/who/let/protect", task_dir);
+  run(buffer);
+  sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir);
+  run(buffer);
+  // create a no permission directory
+  sprintf(buffer, "chmod 000 %s/who/let", task_dir);
+  run(buffer);
+
+  // delete task directory
+  int ret = delete_as_user(username, "jobcache/job_2");
+  if (ret != 0) {
+    printf("FAIL: return code from delete_as_user is %d\n", ret);
+    exit(1);
+  }
+
+  // check to make sure the task directory is gone
+  if (access(task_dir, R_OK) == 0) {
+    printf("FAIL: failed to delete the directory - %s\n", task_dir);
+    exit(1);
+  }
+  // check to make sure the job directory is gone
+  if (access(job_dir, R_OK) == 0) {
+    printf("FAIL: didn't delete the directory - %s\n", job_dir);
+    exit(1);
+  }
+  // but that the canary is not gone
+  if (access(dont_touch, R_OK) != 0) {
+    printf("FAIL: accidently deleted file %s\n", dont_touch);
+    exit(1);
+  }
+  free(job_dir);
+  free(task_dir);
+  free(dont_touch);
+}
+
+
+void test_delete_user() {
+  printf("\nTesting delete_user\n");
+  char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3");
+  if (mkdirs(job_dir, 0700) != 0) {
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/local-1/taskTracker/%s", TEST_ROOT, username);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: directory missing before test\n");
+    exit(1);
+  }
+  if (delete_as_user(username, "") != 0) {
+    exit(1);
+  }
+  if (access(buffer, R_OK) == 0) {
+    printf("FAIL: directory not deleted\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/local-1", R_OK) != 0) {
+    printf("FAIL: local-1 directory does not exist\n");
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_delete_log_directory() {
+  printf("\nTesting delete_log_directory\n");
+  char *job_log_dir = get_job_log_directory("job_1");
+  if (job_log_dir == NULL) {
+    exit(1);
+  }
+  if (create_directory_for_user(job_log_dir) != 0) {
+    exit(1);
+  }
+  free(job_log_dir);
+  char *task_log_dir = get_job_log_directory("job_1/task_2");
+  if (task_log_dir == NULL) {
+    exit(1);
+  }
+  if (mkdirs(task_log_dir, 0700) != 0) {
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) != 0) {
+    printf("FAIL: can't access task directory - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (delete_log_directory("job_1/task_2") != 0) {
+    printf("FAIL: can't delete task directory\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1/task_2", R_OK) == 0) {
+    printf("FAIL: task directory not deleted\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) != 0) {
+    printf("FAIL: job directory not deleted - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (delete_log_directory("job_1") != 0) {
+    printf("FAIL: can't delete task directory\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_1", R_OK) == 0) {
+    printf("FAIL: job directory not deleted\n");
+    exit(1);
+  }
+  free(task_log_dir);
+}
+
+void run_test_in_child(const char* test_name, void (*func)()) {
+  printf("\nRunning test %s in child process\n", test_name);
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    func();
+    exit(0);
+  } else {
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
+      exit(1);
+    }
+    if (!WIFEXITED(status)) {
+      printf("FAIL: child %d didn't exit - %d\n", child, status);
+      exit(1);
+    }
+    if (WEXITSTATUS(status) != 0) {
+      printf("FAIL: child %d exited with bad status %d\n",
+	     child, WEXITSTATUS(status));
+      exit(1);
+    }
+  }
+}
+
+void test_signal_task() {
+  printf("\nTesting signal_task\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  } else {
+    printf("Child task launched as %d\n", child);
+    if (signal_user_task(username, child, SIGQUIT) != 0) {
+      exit(1);
+    }
+    int status = 0;
+    if (waitpid(child, &status, 0) == -1) {
+      printf("FAIL: waitpid failed - %s\n", strerror(errno));
+      exit(1);
+    }
+    if (!WIFSIGNALED(status)) {
+      printf("FAIL: child wasn't signalled - %d\n", status);
+      exit(1);
+    }
+    if (WTERMSIG(status) != SIGQUIT) {
+      printf("FAIL: child was killed with %d instead of %d\n", 
+	     WTERMSIG(status), SIGQUIT);
+      exit(1);
+    }
+  }
+}
+
+void test_signal_task_group() {
+  printf("\nTesting group signal_task\n");
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: fork failed\n");
+    exit(1);
+  } else if (child == 0) {
+    setpgrp();
+    if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+      exit(1);
+    }
+    sleep(3600);
+    exit(0);
+  }
+  printf("Child task launched as %d\n", child);
+  if (signal_user_task(username, child, SIGKILL) != 0) {
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) == -1) {
+    printf("FAIL: waitpid failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (!WIFSIGNALED(status)) {
+    printf("FAIL: child wasn't signalled - %d\n", status);
+    exit(1);
+  }
+  if (WTERMSIG(status) != SIGKILL) {
+    printf("FAIL: child was killed with %d instead of %d\n", 
+	   WTERMSIG(status), SIGKILL);
+    exit(1);
+  }
+}
+
+void test_init_job() {
+  printf("\nTesting init job\n");
+  if (seteuid(0) != 0) {
+    printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  FILE* creds = fopen(TEST_ROOT "/creds.txt", "w");
+  if (creds == NULL) {
+    printf("FAIL: failed to create credentials file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(creds, "secret key\n") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(creds) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  FILE* job_xml = fopen(TEST_ROOT "/job.xml", "w");
+  if (job_xml == NULL) {
+    printf("FAIL: failed to create job file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(job_xml, "<jobconf/>\n") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(job_xml) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (seteuid(user_detail->pw_uid) != 0) {
+    printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+    exit(1);
+  }
+  fflush(stdout);
+  fflush(stderr);
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork process for init_job - %s\n", 
+	   strerror(errno));
+    exit(1);
+  } else if (child == 0) {
+    char *final_pgm[] = {"touch", "my-touch-file", 0};
+    if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt", 
+                       TEST_ROOT "/job.xml", final_pgm) != 0) {
+      printf("FAIL: failed in child\n");
+      exit(42);
+    }
+    // should never return
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) <= 0) {
+    printf("FAIL: failed waiting for process %d - %s\n", child, 
+	   strerror(errno));
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_4", R_OK) != 0) {
+    printf("FAIL: failed to create job log directory\n");
+    exit(1);
+  }
+  char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_4");
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job directory %s\n", job_dir);
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/jobToken", job_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create credentials %s\n", buffer);
+    exit(1);
+  }
+  sprintf(buffer, "%s/my-touch-file", job_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create touch file %s\n", buffer);
+    exit(1);
+  }
+  free(job_dir);
+  job_dir = get_job_log_directory("job_4");
+  if (access(job_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job log directory %s\n", job_dir);
+    exit(1);
+  }
+  free(job_dir);
+}
+
+void test_run_task() {
+  printf("\nTesting run task\n");
+  if (seteuid(0) != 0) {
+    printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  const char* script_name = TEST_ROOT "/task-script";
+  FILE* script = fopen(script_name, "w");
+  if (script == NULL) {
+    printf("FAIL: failed to create script file - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (seteuid(user_detail->pw_uid) != 0) {
+    printf("FAIL: failed to seteuid back to user - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fprintf(script, "#!/bin/bash\n"
+                     "touch foobar\n"
+                     "exit 0") < 0) {
+    printf("FAIL: fprintf failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  if (fclose(script) != 0) {
+    printf("FAIL: fclose failed - %s\n", strerror(errno));
+    exit(1);
+  }
+  fflush(stdout);
+  fflush(stderr);
+  char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1", 
+					      username, "job_4", "task_1");
+  pid_t child = fork();
+  if (child == -1) {
+    printf("FAIL: failed to fork process for init_job - %s\n", 
+	   strerror(errno));
+    exit(1);
+  } else if (child == 0) {
+    if (run_task_as_user(username, "job_4", "task_1", 
+                         task_dir, script_name) != 0) {
+      printf("FAIL: failed in child\n");
+      exit(42);
+    }
+    // should never return
+    exit(1);
+  }
+  int status = 0;
+  if (waitpid(child, &status, 0) <= 0) {
+    printf("FAIL: failed waiting for process %d - %s\n", child, 
+	   strerror(errno));
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_4/task_1", R_OK) != 0) {
+    printf("FAIL: failed to create task log directory\n");
+    exit(1);
+  }
+  if (access(task_dir, R_OK) != 0) {
+    printf("FAIL: failed to create task directory %s\n", task_dir);
+    exit(1);
+  }
+  char buffer[100000];
+  sprintf(buffer, "%s/foobar", task_dir);
+  if (access(buffer, R_OK) != 0) {
+    printf("FAIL: failed to create touch file %s\n", buffer);
+    exit(1);
+  }
+  free(task_dir);
+  task_dir = get_job_log_directory("job_4/task_1");
+  if (access(task_dir, R_OK) != 0) {
+    printf("FAIL: failed to create job log directory %s\n", task_dir);
+    exit(1);
+  }
+  free(task_dir);
+}
+
+int main(int argc, char **argv) {
+  LOGFILE = stdout;
+  int my_username = 0;
+
+  // clean up any junk from previous run
+  system("chmod -R u=rwx " TEST_ROOT "; rm -fr " TEST_ROOT);
+  
+  if (mkdirs(TEST_ROOT "/logs/userlogs", 0755) != 0) {
+    exit(1);
+  }
+  
+  if (write_config_file(TEST_ROOT "/test.cfg") != 0) {
+    exit(1);
+  }
+  read_config(TEST_ROOT "/test.cfg");
+
+  create_tt_roots();
+
+  if (getuid() == 0 && argc == 2) {
+    username = argv[1];
+  } else {
+    username = strdup(getpwuid(getuid())->pw_name);
+    my_username = 1;
+  }
+  set_tasktracker_uid(geteuid(), getegid());
+
+  if (set_user(username)) {
+    exit(1);
+  }
+
+  printf("\nStarting tests\n");
+
+  printf("\nTesting get_user_directory()\n");
+  test_get_user_directory();
+
+  printf("\nTesting get_job_directory()\n");
+  test_get_job_directory();
+
+  printf("\nTesting get_attempt_directory()\n");
+  test_get_attempt_directory();
+
+  printf("\nTesting get_task_launcher_file()\n");
+  test_get_task_launcher_file();
+
+  printf("\nTesting get_job_log_dir()\n");
+  test_get_job_log_dir();
+
+  test_check_configuration_permissions();
+
+  printf("\nTesting get_task_log_dir()\n");
+  test_get_task_log_dir();
+
+  printf("\nTesting delete_task()\n");
+  test_delete_task();
+
+  printf("\nTesting delete_job()\n");
+  test_delete_job();
+
+  test_delete_user();
+
+  test_check_user();
+
+  test_delete_log_directory();
+
+  // the tests that change user need to be run in a subshell, so that
+  // when they change user they don't give up our privs
+  run_test_in_child("test_signal_task", test_signal_task);
+  run_test_in_child("test_signal_task_group", test_signal_task_group);
+
+  // init job and run task can't be run if you aren't testing as root
+  if (getuid() == 0) {
+    // these tests do internal forks so that the change_owner and execs
+    // don't mess up our process.
+    test_init_job();
+    test_run_task();
+  }
+
+  seteuid(0);
+  run("rm -fr " TEST_ROOT);
+  printf("\nFinished tests\n");
+
+  if (my_username) {
+    free(username);
+  }
+  free_configurations();
+  return 0;
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.pdf
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.pdf?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Mar  8 05:56:27 2011
@@ -220,7 +220,7 @@ public abstract class PipeMapRed {
     } catch (IOException e) {
       LOG.error("configuration exception", e);
       throw new RuntimeException("configuration exception", e);
-    } catch (InterruptedException e)  {
+    } catch (InterruptedException e) {
       LOG.error("configuration exception", e);
       throw new RuntimeException("configuration exception", e);
     }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java Tue Mar  8 05:56:27 2011
@@ -24,11 +24,14 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -55,6 +58,7 @@ class Child {
 
   static volatile TaskAttemptID taskid = null;
   static volatile boolean isCleanup;
+  static String cwd;
 
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
@@ -76,6 +80,11 @@ class Child {
     DefaultMetricsSystem.initialize(
         StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
 
+    cwd = System.getenv().get(TaskRunner.HADOOP_WORK_DIR);
+    if (cwd == null) {
+      throw new IOException("Environment variable " +
+                             TaskRunner.HADOOP_WORK_DIR + " is not set");
+    }
     //load token cache storage
     String jobTokenFile = 
       System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
@@ -177,10 +186,6 @@ class Child {
         isCleanup = task.isTaskCleanupTask();
         // reset the statistics for the task
         FileSystem.clearStatistics();
-
-        //create the index file so that the log files 
-        //are viewable immediately
-        TaskLog.syncLogs(logLocation, taskid, isCleanup);
         
         // Create the job-conf and set credentials
         final JobConf job = new JobConf(task.getJobFile());
@@ -193,12 +198,19 @@ class Child {
         // setup the child's Configs.LOCAL_DIR. The child is now sandboxed and
         // can only see files down and under attemtdir only.
         TaskRunner.setupChildMapredLocalDirs(task, job);
+        
+        // setup the child's attempt directories
+        localizeTask(task, job, logLocation);
 
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
-        TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
-
+        TaskRunner.setupWorkDir(job, new File(cwd));
+        
+        //create the index file so that the log files 
+        //are viewable immediately
+        TaskLog.syncLogs(logLocation, taskid, isCleanup);
+        
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);
 
@@ -284,4 +296,19 @@ class Child {
       LogManager.shutdown();
     }
   }
+  static void localizeTask(Task task, JobConf jobConf, String logLocation)
+  throws IOException{
+
+     // Do the task-type specific localization
+     task.localizeConfiguration(jobConf);
+
+     //write the localized task jobconf
+     LocalDirAllocator lDirAlloc =
+       new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+     Path localTaskFile =
+       lDirAlloc.getLocalPathForWrite(TaskTracker.JOBFILE, jobConf);
+     JobLocalizer.writeLocalJobFile(localTaskFile, jobConf);
+     task.setJobFile(localTaskFile.toString());
+     task.setConf(jobConf);
+   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CleanupQueue.java Tue Mar  8 05:56:27 2011
@@ -19,20 +19,26 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 
 class CleanupQueue {
 
   public static final Log LOG =
     LogFactory.getLog(CleanupQueue.class);
 
-  private static PathCleanupThread cleanupThread;
+  private static final PathCleanupThread cleanupThread =
+    new PathCleanupThread();
+  private static final CleanupQueue inst = new CleanupQueue();
+
+  public static CleanupQueue getInstance() { return inst; }
 
   /**
    * Create a singleton path-clean-up queue. It can be used to delete
@@ -42,59 +48,53 @@ class CleanupQueue {
    * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
    * deletion.
    */
-  public CleanupQueue() {
-    synchronized (PathCleanupThread.class) {
-      if (cleanupThread == null) {
-        cleanupThread = new PathCleanupThread();
-      }
-    }
-  }
+  protected CleanupQueue() { }
   
   /**
    * Contains info related to the path of the file/dir to be deleted
    */
   static class PathDeletionContext {
-    String fullPath;// full path of file or dir
-    FileSystem fs;
+    final Path fullPath;// full path of file or dir
+    final Configuration conf;
 
-    public PathDeletionContext(FileSystem fs, String fullPath) {
-      this.fs = fs;
+    public PathDeletionContext(Path fullPath, Configuration conf) {
       this.fullPath = fullPath;
+      this.conf = conf;
     }
     
-    protected String getPathForCleanup() {
+    protected Path getPathForCleanup() {
       return fullPath;
     }
 
     /**
-     * Makes the path(and its subdirectories recursively) fully deletable
+     * Deletes the path (and its subdirectories recursively)
+     * @throws IOException, InterruptedException 
      */
-    protected void enablePathForCleanup() throws IOException {
-      // Do nothing by default.
-      // Subclasses can override to provide enabling for deletion.
+    protected void deletePath() throws IOException, InterruptedException {
+      final Path p = getPathForCleanup();
+      UserGroupInformation.getLoginUser().doAs(
+          new PrivilegedExceptionAction<Object>() {
+            public Object run() throws IOException {
+             p.getFileSystem(conf).delete(p, true);
+             return null;
+            }
+          });
+    }
+
+    @Override
+    public String toString() {
+      final Path p = getPathForCleanup();
+      return (null == p) ? "undefined" : p.toString();
     }
   }
 
   /**
    * Adds the paths to the queue of paths to be deleted by cleanupThread.
    */
-  void addToQueue(PathDeletionContext... contexts) {
+  public void addToQueue(PathDeletionContext... contexts) {
     cleanupThread.addToQueue(contexts);
   }
 
-  protected static boolean deletePath(PathDeletionContext context)
-            throws IOException {
-    context.enablePathForCleanup();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to delete " + context.fullPath);
-    }
-    if (context.fs.exists(new Path(context.fullPath))) {
-      return context.fs.delete(new Path(context.fullPath), true);
-    }
-    return true;
-  }
-
   // currently used by tests only
   protected boolean isQueueEmpty() {
     return (cleanupThread.queue.size() == 0);
@@ -128,18 +128,16 @@ class CleanupQueue {
       while (true) {
         try {
           context = queue.take();
+          context.deletePath();
           // delete the path.
-          if (!deletePath(context)) {
-            LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
-          }
-          else if (LOG.isDebugEnabled()) {
-            LOG.debug("DELETED " + context.fullPath);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("DELETED " + context);
           }
         } catch (InterruptedException t) {
-          LOG.warn("Interrupted deletion of " + context.fullPath);
+          LOG.warn("Interrupted deletion of " + context);
           return;
-        } catch (Exception e) {
-          LOG.warn("Error deleting path " + context.fullPath + ": " + e);
+        } catch (Throwable e) {
+          LOG.warn("Error deleting path " + context, e);
         } 
       }
     }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Tue Mar  8 05:56:27 2011
@@ -14,25 +14,27 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
 
 /**
  * The default implementation for controlling tasks.
@@ -43,187 +45,231 @@ import org.apache.hadoop.fs.Path;
  * 
  * <br/>
  * 
+ *  NOTE: This class is internal only class and not intended for users!!
  */
-@InterfaceAudience.Private
 public class DefaultTaskController extends TaskController {
 
   private static final Log LOG = 
       LogFactory.getLog(DefaultTaskController.class);
-  /**
-   * Launch a new JVM for the task.
-   * 
-   * This method launches the new JVM for the task by executing the
-   * the JVM command using the {@link Shell.ShellCommandExecutor}
-   */
-  void launchTaskJVM(TaskController.TaskControllerContext context) 
-                                      throws IOException {
-    initializeTask(context);
-
-    JvmEnv env = context.env;
-    List<String> wrappedCommand = 
-      TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, true);
-    ShellCommandExecutor shexec = 
-        new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
-                                  env.workDir, env.env);
-    // set the ShellCommandExecutor for later use.
-    context.shExec = shexec;
-    shexec.execute();
-  }
-    
-  /**
-   * Initialize the task environment.
-   * 
-   * Since tasks are launched as the tasktracker user itself, this
-   * method has no action to perform.
-   */
-  void initializeTask(TaskController.TaskControllerContext context) {
-    // The default task controller does not need to set up
-    // any permissions for proper execution.
-    // So this is a dummy method.
-    return;
-  }
-
-  /*
-   * No need to do anything as we don't need to do as we dont need anything
-   * extra from what TaskTracker has done.
-   */
+  private FileSystem fs;
   @Override
-  void initializeJob(JobInitializationContext context) {
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    try {
+      fs = FileSystem.getLocal(conf).getRaw();
+    } catch (IOException ie) {
+      throw new RuntimeException("Failed getting LocalFileSystem", ie);
+    }
   }
 
+  /**
+   * Create all of the directories for the task and launches the child jvm.
+   * @param user the user name
+   * @param attemptId the attempt id
+   * @throws IOException
+   */
   @Override
-  void terminateTask(TaskControllerContext context) {
-    ShellCommandExecutor shexec = context.shExec;
-    if (shexec != null) {
-      Process process = shexec.getProcess();
-      if (Shell.WINDOWS) {
-        // Currently we don't use setsid on WINDOWS. 
-        //So kill the process alone.
-        if (process != null) {
-          process.destroy();
-        }
-      }
-      else { // In addition to the task JVM, kill its subprocesses also.
-        String pid = context.pid;
-        if (pid != null) {
-          if(ProcessTree.isSetsidAvailable) {
-            ProcessTree.terminateProcessGroup(pid);
-          }else {
-            ProcessTree.terminateProcess(pid);
-          }
-        }
+  public int launchTask(String user, 
+                                  String jobId,
+                                  String attemptId,
+                                  List<String> setup,
+                                  List<String> jvmArguments,
+                                  File currentWorkDirectory,
+                                  String stdout,
+                                  String stderr) throws IOException {
+    
+    ShellCommandExecutor shExec = null;
+    try {
+      FileSystem localFs = FileSystem.getLocal(getConf());
+      
+      //create the attempt dirs
+      new Localizer(localFs, 
+          getConf().getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)).
+          initializeAttemptDirs(user, jobId, attemptId);
+      
+      // create the working-directory of the task 
+      if (!currentWorkDirectory.mkdir()) {
+        throw new IOException("Mkdirs failed to create " 
+                    + currentWorkDirectory.toString());
       }
-    }
-  }
-  
-  @Override
-  void killTask(TaskControllerContext context) {
-    ShellCommandExecutor shexec = context.shExec;
-    if (shexec != null) {
-      if (Shell.WINDOWS) {
-        //We don't do send kill process signal in case of windows as 
-        //already we have done a process.destroy() in terminateTaskJVM()
-        return;
+      
+      //mkdir the loglocation
+      String logLocation = TaskLog.getAttemptDir(jobId, attemptId).toString();
+      if (!localFs.mkdirs(new Path(logLocation))) {
+        throw new IOException("Mkdirs failed to create " 
+                   + logLocation);
       }
-      String pid = context.pid;
-      if (pid != null) {
-        if(ProcessTree.isSetsidAvailable) {
-          ProcessTree.killProcessGroup(pid);
-        } else {
-          ProcessTree.killProcess(pid);
-        }
+      //read the configuration for the job
+      FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+      long logSize = 0; //TODO: Ref BUG:2854624
+      // get the JVM command line.
+      String cmdLine = 
+        TaskLog.buildCommandLine(setup, jvmArguments,
+            new File(stdout), new File(stderr), logSize, true);
+
+      // write the command to a file in the
+      // task specific cache directory
+      // TODO copy to user dir
+      Path p = new Path(allocator.getLocalPathForWrite(
+          TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+          getConf()), COMMAND_FILE);
+
+      String commandFile = writeCommand(cmdLine, rawFs, p);
+      rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
+      shExec = new ShellCommandExecutor(new String[]{
+          "bash", "-c", commandFile},
+          currentWorkDirectory);
+      shExec.execute();
+    } catch (Exception e) {
+      if (shExec == null) {
+        return -1;
       }
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      LOG.info("Output from DefaultTaskController's launchTask follows:");
+      logOutput(shExec.getOutput());
+      return exitCode;
     }
+    return 0;
   }
-
+    
+  /**
+   * This routine initializes the local file system for running a job.
+   * Details:
+   * <ul>
+   * <li>Copies the credentials file from the TaskTracker's private space to
+   * the job's private space </li>
+   * <li>Creates the job work directory and set 
+   * {@link TaskTracker#JOB_LOCAL_DIR} in the configuration</li>
+   * <li>Downloads the job.jar, unjars it, and updates the configuration to 
+   * reflect the localized path of the job.jar</li>
+   * <li>Creates a base JobConf in the job's private space</li>
+   * <li>Sets up the distributed cache</li>
+   * <li>Sets up the user logs directory for the job</li>
+   * </ul>
+   * This method must be invoked in the access control context of the job owner 
+   * user. This is because the distributed cache is also setup here and the 
+   * access to the hdfs files requires authentication tokens in case where 
+   * security is enabled.
+   * @param user the user in question (the job owner)
+   * @param jobid the ID of the job in question
+   * @param credentials the path to the credentials file that the TaskTracker
+   * downloaded
+   * @param jobConf the path to the job configuration file that the TaskTracker
+   * downloaded
+   * @param taskTracker the connection to the task tracker
+   * @throws IOException
+   * @throws InterruptedException
+   */
   @Override
-  void dumpTaskStack(TaskControllerContext context) {
-    ShellCommandExecutor shexec = context.shExec;
-    if (shexec != null) {
-      if (Shell.WINDOWS) {
-        // We don't use signals in Windows.
-        return;
-      }
-      String pid = context.pid;
-      if (pid != null) {
-        // Send SIGQUIT to get a stack dump
-        if (ProcessTree.isSetsidAvailable) {
-          ProcessTree.sigQuitProcessGroup(pid);
-        } else {
-          ProcessTree.sigQuitProcess(pid);
-        }
-      }
-    }
+  public void initializeJob(String user, String jobid, 
+                            Path credentials, Path jobConf, 
+                            TaskUmbilicalProtocol taskTracker,
+                            InetSocketAddress ttAddr
+                            ) throws IOException, InterruptedException {
+    final LocalDirAllocator lDirAlloc = allocator;
+    FileSystem localFs = FileSystem.getLocal(getConf());
+    JobLocalizer localizer = new JobLocalizer((JobConf)getConf(), user, jobid);
+    localizer.createLocalDirs();
+    localizer.createUserDirs();
+    localizer.createJobDirs();
+
+    JobConf jConf = new JobConf(jobConf);
+    localizer.createWorkDir(jConf);
+    //copy the credential file
+    Path localJobTokenFile = lDirAlloc.getLocalPathForWrite(
+        TaskTracker.getLocalJobTokenFile(user, jobid), getConf());
+    FileUtil.copy(
+        localFs, credentials, localFs, localJobTokenFile, false, getConf());
+
+
+    //setup the user logs dir
+    localizer.initializeJobLogDir();
+
+    // Download the job.jar for this job from the system FS
+    // setup the distributed cache
+    // write job acls
+    // write localized config
+    localizer.localizeJobFiles(JobID.forName(jobid), jConf, localJobTokenFile, 
+                               taskTracker);
   }
 
   @Override
-  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
+  public boolean signalTask(String user, int taskPid, Signal signal)
       throws IOException {
-    Path localizedUniqueDir = context.getLocalizedUniqueDir();
+    final int sigpid = TaskController.isSetsidAvailable
+      ? -1 * taskPid
+      : taskPid;
     try {
-      // Setting recursive execute permission on localized dir
-      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
-      FileUtil.chmod(localizedUniqueDir.toString(), "+x", true);
-    } catch (InterruptedException ie) {
-      LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie);
-      throw new IOException(ie);
+      sendSignal(sigpid, Signal.NULL);
+    } catch (ExitCodeException e) {
+      return false;
     }
+    try {
+      sendSignal(sigpid, signal);
+    } catch (IOException e) {
+      try {
+        sendSignal(sigpid, Signal.NULL);
+      } catch (IOException ignore) {
+        return false;
+      }
+      throw e;
+    }
+    return true;
   }
 
-  @Override
-  public void initializeUser(InitializationContext context) {
-    // Do nothing.
-  }
-  
-  @Override
-  void runDebugScript(DebugScriptContext context) throws IOException {
-    List<String>  wrappedCommand = TaskLog.captureDebugOut(context.args, 
-        context.stdout);
-    // run the script.
-    ShellCommandExecutor shexec = 
-      new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), context.workDir);
+  /**
+   * Send a specified signal to the specified pid
+   *
+   * @param pid the pid of the process [group] to signal.
+   * @param signal signal to send
+   * (for logging).
+   */
+  protected void sendSignal(int pid, Signal signal) throws IOException {
+    ShellCommandExecutor shexec = null;
+      String[] arg = { "kill", "-" + signal.getValue(), Integer.toString(pid) };
+      shexec = new ShellCommandExecutor(arg);
     shexec.execute();
-    int exitCode = shexec.getExitCode();
-    if (exitCode != 0) {
-      throw new IOException("Task debug script exit with nonzero status of " 
-          + exitCode + ".");
-    }
   }
 
   /**
-   * Enables the task for cleanup by changing permissions of the specified path
-   * in the local filesystem
+   * Delete the user's files under all of the task tracker root directories.
+   * @param user the user name
+   * @param subDir the path relative to base directories
+   * @param baseDirs the base directories (absolute paths)
+   * @throws IOException
    */
   @Override
-  void enableTaskForCleanup(PathDeletionContext context)
-         throws IOException {
-    enablePathForCleanup(context);
+  public void deleteAsUser(String user, 
+                           String subDir, 
+                           String... baseDirs) throws IOException {
+    if (baseDirs == null || baseDirs.length == 0) {
+      LOG.info("Deleting absolute path : " + subDir);
+      fs.delete(new Path(subDir), true);
+      return;
+    }
+    for (String baseDir : baseDirs) {
+      LOG.info("Deleting path : " + baseDir + Path.SEPARATOR + subDir);
+      fs.delete(new Path(baseDir + Path.SEPARATOR + subDir), true);
+    }
   }
   
   /**
-   * Enables the job for cleanup by changing permissions of the specified path
-   * in the local filesystem
+   * Delete the user's files under the userlogs directory.
+   * @param user the user to work as
+   * @param subDir the path under the userlogs directory.
+   * @throws IOException
    */
   @Override
-  void enableJobForCleanup(PathDeletionContext context)
-         throws IOException {
-    enablePathForCleanup(context);
+  public void deleteLogAsUser(String user, 
+                              String subDir) throws IOException {
+    Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
+    fs.delete(dir, true);
   }
-  
-  /**
-   * Enables the path for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
-  private void enablePathForCleanup(PathDeletionContext context)
-         throws IOException {
-    try {
-      FileUtil.chmod(context.fullPath, "u+rwx", true);
-    } catch(InterruptedException e) {
-      LOG.warn("Interrupted while setting permissions for " + context.fullPath +
-          " for deletion.");
-    } catch(IOException ioe) {
-      LOG.warn("Unable to change permissions of " + context.fullPath);
-    }
+
+  @Override
+  public void setup(LocalDirAllocator allocator) {
+    this.allocator = allocator;
   }
+  
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Mar  8 05:56:27 2011
@@ -121,6 +121,13 @@ public class IsolationRunner {
         SortedRanges.Range range) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
+
+    @Override
+    public void 
+    updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                       long[] sizes){
+      // NOTHING
+    }
   }
   
   private ClassLoader makeClassLoader(JobConf conf, 
@@ -181,9 +188,15 @@ public class IsolationRunner {
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    Path workDirName;
+    boolean workDirExists = lDirAlloc.ifExists(MRConstants.WORKDIR, conf);
+    if (workDirExists) {
+      workDirName = TaskRunner.formWorkDir(lDirAlloc, conf);
+    } else {
+      workDirName = lDirAlloc.getLocalPathForWrite(MRConstants.WORKDIR, conf);
+    }
 
-    File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
-    local.setWorkingDirectory(new Path(workDirName.toString()));
+    local.setWorkingDirectory(workDirName);
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
     
     // set up a classloader with the right classpath

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Mar  8 05:56:27 2011
@@ -477,6 +477,11 @@ public class JobInProgress {
       this.submitHostName = conf.getJobSubmitHostName();
       this.submitHostAddress = conf.getJobSubmitHostAddress();
 
+      this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedList<TaskInProgress>();
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
       this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
           MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
       this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
@@ -3451,8 +3456,8 @@ public class JobInProgress {
          }
 
          Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-         new CleanupQueue().addToQueue(new PathDeletionContext(
-             jobtracker.getFileSystem(), tempDir.toUri().getPath())); 
+         CleanupQueue.getInstance().addToQueue(
+             new PathDeletionContext(tempDir, conf)); 
        } catch (IOException e) {
          LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
        }

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java?rev=1079211&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobLocalizer.java Tue Mar  8 05:56:27 2011
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Internal class responsible for initializing the job, not intended for users.
+ * Creates the following hierarchy:
+ *   <li>$mapred.local.dir/taskTracker/$user</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/work</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars/job.jar</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/job.xml</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jobToken</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+ */
+public class JobLocalizer {
+
+  static final Log LOG = LogFactory.getLog(JobLocalizer.class);
+
+  private static final FsPermission urwx =
+    FsPermission.createImmutable((short) 0700);
+  private static final FsPermission urwx_gx =
+    FsPermission.createImmutable((short) 0710);
+  private static final FsPermission urw_gr =
+    FsPermission.createImmutable((short) 0640);
+
+  private final String user;
+  private final String jobid;
+  private final FileSystem lfs;
+  private final List<Path> localDirs;
+  private final LocalDirAllocator lDirAlloc;
+  private final JobConf ttConf;
+
+  private final String JOBDIR;
+  private final String DISTDIR;
+  private final String WORKDIR;
+  private final String JARDST;
+  private final String JOBCONF;
+  private final String JOBTOKEN;
+  private static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
+
+  public JobLocalizer(JobConf ttConf, String user, String jobid)
+      throws IOException {
+    this(ttConf, user, jobid,
+        ttConf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+  }
+
+  public JobLocalizer(JobConf ttConf, String user, String jobid,
+      String... localDirs) throws IOException {
+    if (null == user) {
+      throw new IOException("Cannot initialize for null user");
+    }
+    this.user = user;
+    if (null == jobid) {
+      throw new IOException("Cannot initialize for null jobid");
+    }
+    this.jobid = jobid;
+    this.ttConf = ttConf;
+    lfs = FileSystem.getLocal(ttConf).getRaw();
+    this.localDirs = createPaths(user, localDirs);
+    ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
+    Collections.shuffle(this.localDirs);
+    lDirAlloc = new LocalDirAllocator(JOB_LOCAL_CTXT);
+    JOBDIR = TaskTracker.JOBCACHE + Path.SEPARATOR + jobid;
+    DISTDIR = JOBDIR + "/" + TaskTracker.DISTCACHEDIR;
+    WORKDIR = JOBDIR + "/work";
+    JARDST = JOBDIR + "/" + TaskTracker.JARSDIR + "/job.jar";
+    JOBCONF = JOBDIR + "/" + TaskTracker.JOBFILE;
+    JOBTOKEN = JOBDIR + "/" + TaskTracker.JOB_TOKEN_FILE;
+  }
+
+  private static List<Path> createPaths(String user, final String[] str)
+      throws IOException {
+    if (null == str || 0 == str.length) {
+      throw new IOException("mapred.local.dir contains no entries");
+    }
+    final List<Path> ret = new ArrayList<Path>(str.length);
+    for (int i = 0; i < str.length; ++i) {
+      final Path p = new Path(str[i], TaskTracker.getUserDir(user));
+      ret.add(p);
+      str[i] = p.toString();
+    }
+    return ret;
+  }
+
+  public void createLocalDirs() throws IOException {
+    boolean userDirStatus = false;
+    // create all directories as rwx------
+    for (Path localDir : localDirs) {
+      // create $mapred.local.dir/taskTracker/$user
+      if (!lfs.mkdirs(localDir, urwx)) {
+        LOG.warn("Unable to create the user directory : " + localDir);
+        continue;
+      }
+      userDirStatus = true;
+    }
+    if (!userDirStatus) {
+      throw new IOException("Not able to initialize user directories "
+          + "in any of the configured local directories for user " + user);
+    }
+  }
+
+  /**
+   * Initialize the local directories for a particular user on this TT. This
+   * involves creation and setting permissions of the following directories
+   * <ul>
+   * <li>$mapred.local.dir/taskTracker/$user</li>
+   * <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+   * <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+   * </ul>
+   */
+  public void createUserDirs() throws IOException {
+    LOG.info("Initializing user " + user + " on this TT.");
+
+    boolean jobCacheDirStatus = false;
+    boolean distributedCacheDirStatus = false;
+
+    // create all directories as rwx------
+    for (Path localDir : localDirs) {
+      // create $mapred.local.dir/taskTracker/$user/jobcache
+      final Path jobDir =
+        new Path(localDir, TaskTracker.JOBCACHE);
+      if (!lfs.mkdirs(jobDir, urwx)) {
+        LOG.warn("Unable to create job cache directory : " + jobDir);
+      } else {
+        jobCacheDirStatus = true;
+      }
+      // create $mapred.local.dir/taskTracker/$user/distcache
+      final Path distDir =
+        new Path(localDir, TaskTracker.DISTCACHEDIR);
+      if (!lfs.mkdirs(distDir, urwx)) {
+        LOG.warn("Unable to create distributed-cache directory : " + distDir);
+      } else {
+        distributedCacheDirStatus = true;
+      }
+    }
+    if (!jobCacheDirStatus) {
+      throw new IOException("Not able to initialize job-cache directories "
+          + "in any of the configured local directories for user " + user);
+    }
+    if (!distributedCacheDirStatus) {
+      throw new IOException(
+          "Not able to initialize distributed-cache directories "
+              + "in any of the configured local directories for user "
+              + user);
+    }
+  }
+
+  /**
+   * Prepare the job directories for a given job. To be called by the job
+   * localization code, only if the job is not already localized.
+   * <br>
+   * Here, we set 700 permissions on the job directories created on all disks.
+   * This we do so as to avoid any misuse by other users till the time
+   * {@link TaskController#initializeJob} is run at a
+   * later time to set proper private permissions on the job directories. <br>
+   */
+  public void createJobDirs() throws IOException {
+    boolean initJobDirStatus = false;
+    for (Path localDir : localDirs) {
+      Path fullJobDir = new Path(localDir, JOBDIR);
+      if (lfs.exists(fullJobDir)) {
+        // this will happen on a partial execution of localizeJob. Sometimes
+        // copying job.xml to the local disk succeeds but copying job.jar might
+        // throw out an exception. We should clean up and then try again.
+        lfs.delete(fullJobDir, true);
+      }
+      // create $mapred.local.dir/taskTracker/$user/jobcache/$jobid
+      if (!lfs.mkdirs(fullJobDir, urwx)) {
+        LOG.warn("Not able to create job directory " + fullJobDir.toString());
+      } else {
+        initJobDirStatus = true;
+      }
+    }
+    if (!initJobDirStatus) {
+      throw new IOException("Not able to initialize job directories "
+          + "in any of the configured local directories for job "
+          + jobid.toString());
+    }
+  }
+
+  /**
+   * Create job log directory and set appropriate permissions for the directory.
+   */
+  public void initializeJobLogDir() throws IOException {
+    Path jobUserLogDir = new Path(TaskLog.getJobDir(jobid).toURI().toString());
+    if (!lfs.mkdirs(jobUserLogDir, urwx_gx)) {
+      throw new IOException(
+          "Could not create job user log directory: " + jobUserLogDir);
+    }
+  }
+
+  /**
+   * Download the job jar file from FS to the local file system and unjar it.
+   * Set the local jar file in the passed configuration.
+   *
+   * @param localJobConf
+   * @throws IOException
+   */
+  private void localizeJobJarFile(JobConf localJobConf)
+      throws IOException, InterruptedException {
+    // copy Jar file to the local FS and unjar it.
+    String jarFile = localJobConf.getJar();
+    FileStatus status = null;
+    long jarFileSize = -1;
+    if (jarFile != null) {
+      Path jarFilePath = new Path(jarFile);
+      FileSystem userFs = jarFilePath.getFileSystem(localJobConf);
+      try {
+        status = userFs.getFileStatus(jarFilePath);
+        jarFileSize = status.getLen();
+      } catch (FileNotFoundException fe) {
+        jarFileSize = -1;
+      }
+      // Here we check for five times the size of jarFileSize to accommodate for
+      // unjarring the jar file in the jars directory
+      Path localJarFile =
+        lDirAlloc.getLocalPathForWrite(JARDST, 5 * jarFileSize, ttConf);
+
+      //Download job.jar
+      userFs.copyToLocalFile(jarFilePath, localJarFile);
+      localJobConf.setJar(localJarFile.toString());
+      // Also un-jar the job.jar files. We un-jar it so that classes inside
+      // sub-directories, for e.g., lib/, classes/ are available on class-path
+      RunJar.unJar(new File(localJarFile.toString()),
+          new File(localJarFile.getParent().toString()));
+      FileUtil.chmod(localJarFile.getParent().toString(), "ugo+rx", true);
+    }
+  }
+
+  /**
+   * The permissions to use for the private distributed cache objects.
+   * It is already protected by the user directory, so keep the group and other
+   * the same so that LocalFileSystem will use the java File methods to
+   * set permission.
+   */
+  private static final FsPermission privateCachePerms =
+    FsPermission.createImmutable((short) 0755);
+  
+  /**
+   * Given a list of objects, download each one.
+   * @param conf the job's configuration
+   * @param sources the list of objects to download from
+   * @param dests the list of paths to download them to
+   * @param times the desired modification times
+   * @param isPublic are the objects in the public cache?
+   * @param isArchive are these archive files?
+   * @throws IOException
+   * @return for archives, return the list of each of the sizes.
+   */
+  private static long[] downloadPrivateCacheObjects(Configuration conf,
+                                             URI[] sources,
+                                             Path[] dests,
+                                             long[] times,
+                                             boolean[] isPublic,
+                                             boolean isArchive
+                                             ) throws IOException,
+                                                      InterruptedException {
+    if (null == sources && null == dests && null == times && null == isPublic) {
+      return null;
+    }
+    if (sources.length != dests.length ||
+        sources.length != times.length ||
+        sources.length != isPublic.length) {
+      throw new IOException("Distributed cache entry arrays have different " +
+                            "lengths: " + sources.length + ", " + dests.length +
+                            ", " + times.length + ", " + isPublic.length);
+    }
+    long[] result = new long[sources.length];
+    for(int i=0; i < sources.length; i++) {
+      // public objects are already downloaded by the Task Tracker, we
+      // only need to handle the private ones here
+      if (!isPublic[i]) {
+        result[i] = 
+          TrackerDistributedCacheManager.downloadCacheObject(conf, sources[i], 
+                                                             dests[i], 
+                                                             times[i], 
+                                                             isArchive, 
+                                                             privateCachePerms);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Download the parts of the distributed cache that are private.
+   * @param conf the job's configuration
+   * @throws IOException
+   * @return the size of the archive objects
+   */
+  public static long[] downloadPrivateCache(Configuration conf)
+      throws InterruptedException, IOException {
+    downloadPrivateCacheObjects(conf,
+                                DistributedCache.getCacheFiles(conf),
+                                DistributedCache.getLocalCacheFiles(conf),
+                                DistributedCache.getFileTimestamps(conf),
+                                TrackerDistributedCacheManager.
+                                  getFileVisibilities(conf),
+                                false);
+    return 
+      downloadPrivateCacheObjects(conf,
+                                  DistributedCache.getCacheArchives(conf),
+                                  DistributedCache.getLocalCacheArchives(conf),
+                                  DistributedCache.getArchiveTimestamps(conf),
+                                  TrackerDistributedCacheManager.
+                                    getArchiveVisibilities(conf),
+                                  true);
+  }
+
+  public void localizeJobFiles(JobID jobid, JobConf jConf,
+      Path localJobTokenFile, TaskUmbilicalProtocol taskTracker)
+      throws IOException, InterruptedException {
+    localizeJobFiles(jobid, jConf,
+        lDirAlloc.getLocalPathForWrite(JOBCONF, ttConf), localJobTokenFile,
+        taskTracker);
+  }
+
+  public void localizeJobFiles(final JobID jobid, JobConf jConf,
+      Path localJobFile, Path localJobTokenFile,
+      final TaskUmbilicalProtocol taskTracker) 
+  throws IOException, InterruptedException {
+    // Download the job.jar for this job from the system FS
+    localizeJobJarFile(jConf);
+
+    jConf.set(JOB_LOCAL_CTXT, ttConf.get(JOB_LOCAL_CTXT));
+
+    //update the config some more
+    jConf.set(TokenCache.JOB_TOKENS_FILENAME, localJobTokenFile.toString());
+    jConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, 
+        ttConf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+    TaskTracker.resetNumTasksPerJvm(jConf);
+
+    //setup the distributed cache
+    final long[] sizes = downloadPrivateCache(jConf);
+    if (sizes != null) {
+      //the following doAs is required because the DefaultTaskController
+      //calls the localizeJobFiles method in the context of the TaskTracker
+      //process. The JVM authorization check would fail without this
+      //doAs. In the LinuxTC case, this doesn't harm.
+      UserGroupInformation ugi = 
+        UserGroupInformation.createRemoteUser(jobid.toString());
+      ugi.doAs(new PrivilegedExceptionAction<Object>() { 
+        public Object run() throws IOException {
+          taskTracker.updatePrivateDistributedCacheSizes(jobid, sizes);
+          return null;
+        }
+      });
+    }
+
+    // Create job-acls.xml file in job userlog dir and write the needed
+    // info for authorization of users for viewing task logs of this job.
+    writeJobACLs(jConf, new Path(TaskLog.getJobDir(jobid).toURI().toString()));
+
+    //write the updated jobConf file in the job directory
+    JobLocalizer.writeLocalJobFile(localJobFile, jConf);
+  }
+
+  /**
+   *  Creates job-acls.xml under the given directory logDir and writes
+   *  job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+   *  file.
+   *  queue name is the queue to which the job was submitted to.
+   *  queue-admins-acl is the queue admins ACL of the queue to which this
+   *  job was submitted to.
+   * @param conf   job configuration
+   * @param logDir job userlog dir
+   * @throws IOException
+   */
+  private void writeJobACLs(JobConf conf, Path logDir) throws IOException {
+    JobConf aclConf = new JobConf(false);
+
+    // set the job view acl in aclConf
+    String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
+    aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
+
+    // set the job queue name in aclConf
+    String queue = conf.getQueueName();
+    aclConf.setQueueName(queue);
+
+    // set the queue admins acl in aclConf
+    String qACLName = QueueManager.toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName());
+    String queueAdminsACL = conf.get(qACLName, " ");
+    aclConf.set(qACLName, queueAdminsACL);
+
+    // set jobOwner as user.name in aclConf
+    aclConf.set("user.name", user);
+
+    OutputStream out = null;
+    Path aclFile = new Path(logDir, TaskTracker.jobACLsFile);
+    try {
+      out = lfs.create(aclFile);
+      aclConf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+    lfs.setPermission(aclFile, urw_gr);
+  }
+
+  public void createWorkDir(JobConf jConf) throws IOException {
+    // create $mapred.local.dir/taskTracker/$user/jobcache/$jobid/work
+    final Path workDir = lDirAlloc.getLocalPathForWrite(WORKDIR, ttConf);
+    if (!lfs.mkdirs(workDir)) {
+      throw new IOException("Mkdirs failed to create "
+          + workDir.toString());
+    }
+    jConf.set(TaskTracker.JOB_LOCAL_DIR, workDir.toUri().getPath());
+  }
+
+  public Path findCredentials() throws IOException {
+    return lDirAlloc.getLocalPathToRead(JOBTOKEN, ttConf);
+  }
+
+  public int runSetup(String user, String jobid, Path localJobTokenFile,
+                      TaskUmbilicalProtocol taskTracker) 
+  throws IOException, InterruptedException {
+    // load user credentials, configuration
+    // ASSUME
+    // let $x = $mapred.local.dir
+    // forall $x, exists $x/$user
+    // exists $x/$user/jobcache/$jobid/job.xml
+    // exists $x/$user/jobcache/$jobid/jobToken
+    // exists $logdir/userlogs/$jobid
+    final Path localJobFile = lDirAlloc.getLocalPathToRead(JOBCONF, ttConf);
+    final JobConf cfgJob = new JobConf(localJobFile);
+    createWorkDir(cfgJob);
+    localizeJobFiles(JobID.forName(jobid), cfgJob, localJobFile,
+        localJobTokenFile, taskTracker);
+
+    // $mapred.local.dir/taskTracker/$user/distcache
+    return 0;
+  }
+
+  public static void main(String[] argv)
+      throws IOException, InterruptedException {
+    // $logdir
+    // let $x = $root/tasktracker for some $mapred.local.dir
+    //   create $x/$user/jobcache/$jobid/work
+    //   fetch  $x/$user/jobcache/$jobid/jars/job.jar
+    //   setup  $x/$user/distcache
+    //   verify $x/distcache
+    //   write  $x/$user/jobcache/$jobid/job.xml
+    final String user = argv[0];
+    final String jobid = argv[1];
+    final InetSocketAddress ttAddr = 
+      new InetSocketAddress(argv[2], Integer.parseInt(argv[3]));
+    final String uid = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (!user.equals(uid)) {
+      LOG.warn("Localization running as " + uid + " not " + user);
+    }
+
+    // Pull in user's tokens to complete setup
+    final JobConf conf = new JobConf();
+    final JobLocalizer localizer =
+      new JobLocalizer(conf, user, jobid);
+    final Path jobTokenFile = localizer.findCredentials();
+    final Credentials creds = TokenCache.loadTokens(
+        jobTokenFile.toUri().toString(), conf);
+    LOG.debug("Loaded tokens from " + jobTokenFile);
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    for (Token<? extends TokenIdentifier> token : creds.getAllTokens()) {
+      ugi.addToken(token);
+    }
+    
+    UserGroupInformation ugiJob = UserGroupInformation.createRemoteUser(jobid);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(creds);
+    jt.setService(new Text(ttAddr.getAddress().getHostAddress() + ":"
+        + ttAddr.getPort()));
+    ugiJob.addToken(jt);
+
+    final TaskUmbilicalProtocol taskTracker = 
+      ugiJob.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+        public TaskUmbilicalProtocol run() throws IOException {
+          TaskUmbilicalProtocol taskTracker =
+            (TaskUmbilicalProtocol) RPC.getProxy(TaskUmbilicalProtocol.class,
+                TaskUmbilicalProtocol.versionID,
+                ttAddr, conf);
+          return taskTracker;
+        }
+      });
+    System.exit(
+      ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+        public Integer run() {
+          try {
+            return localizer.runSetup(user, jobid, jobTokenFile, taskTracker);
+          } catch (Throwable e) {
+            e.printStackTrace(System.out);
+            return -1;
+          }
+        }
+      }));
+  }
+
+  /**
+   * Write the task specific job-configuration file.
+   * @throws IOException
+   */
+  public static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+
+}