You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:05:28 UTC

svn commit: r1077343 - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 04:05:28 2011
New Revision: 1077343

URL: http://svn.apache.org/viewvc?rev=1077343&view=rev
Log:
commit 04196367a12b565f4e0854ad4c2275f7d69be39f
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date:   Fri Mar 19 19:49:22 2010 +0530

    MAPREDUCE:1609 from https://issues.apache.org/jira/secure/attachment/12439278/MAPREDUCE-1609-20-1.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1609. Fixes a problem with localization of job log
    +    directories when tasktracker is re-initialized that can result
    +    in failed tasks. (Amareshwari Sriramadasu via yhemanth)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077343&r1=1077342&r2=1077343&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar  4 04:05:28 2011
@@ -387,21 +387,8 @@ static int secure_path(const char *path,
       continue;
     }
 
-    if (should_check_ownership && 
-        (check_ownership(entry->fts_path, uid, gid) != 0)) {
-      fprintf(LOGFILE,
-          "Invalid file path. %s not user/group owned by the tasktracker.\n",
-          entry->fts_path);
-      error_code = -1;
-    } else if (change_owner(entry->fts_path, uid, gid) != 0) {
-      fprintf(LOGFILE, "couldn't change the ownership of %s\n",
-          entry->fts_path);
-      error_code = -3;
-    } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) != 0) {
-      fprintf(LOGFILE, "couldn't change the permissions of %s\n",
-          entry->fts_path);
-      error_code = -3;
-    }
+    error_code = secure_single_path(entry->fts_path, uid, gid,
+     (dir ? dir_mode : file_mode), should_check_ownership);
   }
   if (fts_close(tree) != 0) {
     fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
@@ -411,6 +398,28 @@ static int secure_path(const char *path,
 }
 
 /**
+ * Function to change ownership and permissions of the given path. 
+ * This call sets ownership and permissions just for the path, not recursive.  
+ */
+int secure_single_path(char *path, uid_t uid, gid_t gid,
+    mode_t perm, int should_check_ownership) {
+  int error_code = 0;
+  if (should_check_ownership && 
+      (check_ownership(path, uid, gid) != 0)) {
+    fprintf(LOGFILE,
+      "Invalid file path. %s not user/group owned by the tasktracker.\n", path);
+    error_code = -1;
+  } else if (change_owner(path, uid, gid) != 0) {
+    fprintf(LOGFILE, "couldn't change the ownership of %s\n", path);
+    error_code = -3;
+  } else if (change_mode(path, perm) != 0) {
+    fprintf(LOGFILE, "couldn't change the permissions of %s\n", path);
+    error_code = -3;
+  }
+  return error_code;
+}
+
+/**
  * Function to prepare the attempt directories for the task JVM.
  * This is done by changing the ownership of the attempt directory recursively
  * to the job owner. We do the following:
@@ -541,8 +550,10 @@ int prepare_job_logs(const char *log_dir
   }
 
   gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-  if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
-      permissions, S_ISGID | permissions, 1) != 0) {
+  // job log directory should not be set permissions recursively
+  // because, on tt restart/reinit, it would contain directories of earlier run
+  if (secure_single_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+      S_ISGID | permissions, 1) != 0) {
     fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
     free(job_log_dir);
     return -1;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077343&r1=1077342&r2=1077343&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar  4 04:05:28 2011
@@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -58,27 +57,6 @@ public class TestLocalizationWithLinuxTa
 
     super.setUp();
 
-    taskController = new MyLinuxTaskController();
-    String path =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
-    String execPath = path + "/task-controller";
-    ((MyLinuxTaskController) taskController).setTaskControllerExe(execPath);
-    taskController.setConf(trackerFConf);
-    taskController.setup();
-
-    tracker.setTaskController(taskController);
-    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
-        taskController));
-
-    // Rewrite conf so as to reflect task's correct user name.
-    String ugi =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
-    JobConf jobConf = new JobConf(task.getConf());
-    String user = ugi.split(",")[0];
-    jobConf.setUser(user);
-    File jobConfFile = uploadJobConf(jobConf);
-    task.setConf(jobConf);
-    task.setUser(user);
     taskTrackerUserName = UserGroupInformation.getLoginUser()
                           .getShortUserName();
   }
@@ -95,6 +73,18 @@ public class TestLocalizationWithLinuxTa
     }
   }
 
+  protected TaskController getTaskController() {
+    return new MyLinuxTaskController();
+  }
+
+  protected UserGroupInformation getJobOwner() {
+    String ugi = System
+        .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    String[] splits = ugi.split(",");
+    return UserGroupInformation.createUserForTesting(splits[0],
+        new String[] { splits[1] });
+  }
+
   /** @InheritDoc */
   @Override
   public void testTaskControllerSetup() {
@@ -218,7 +208,8 @@ public class TestLocalizationWithLinuxTa
     // check the private permissions of various directories
     List<Path> dirs = new ArrayList<Path>();
     dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
-        .getUser(), jobId.toString(), taskId.toString()), trackerFConf));
+        .getUser(), jobId.toString(), taskId.toString(),
+        task.isTaskCleanupTask()), trackerFConf));
     dirs.add(attemptWorkDir);
     dirs.add(new Path(attemptWorkDir, "tmp"));
     dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077343&r1=1077342&r2=1077343&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 04:05:28 2011
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.TreeMap;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -75,6 +78,8 @@ public class TestTaskTrackerLocalization
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
   private TaskInProgress tip;
+  private JobConf jobConf;
+  private File jobConfFile;
 
   /**
    * Dummy method in this base class. Only derived classes will define this
@@ -113,12 +118,14 @@ public class TestTaskTrackerLocalization
     trackerFConf.setStrings("mapred.local.dir", localDirs);
 
     // Create the job configuration file. Same as trackerConf in this test.
-    JobConf jobConf = new JobConf(trackerFConf);
+    jobConf = new JobConf(trackerFConf);
     // Set job view ACLs in conf sothat validation of contents of jobACLsFile
     // can be done against this value. Have both users and groups
     String jobViewACLs = "user1,user2, group1,group2";
     jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
     jobConf.setInt("mapred.userlog.retain.hours", 0);
+    jobConf.setUser(getJobOwner().getShortUserName());
+
     String jtIdentifier = "200907202331";
     jobId = new JobID(jtIdentifier, 1);
 
@@ -127,49 +134,69 @@ public class TestTaskTrackerLocalization
     uploadJobJar(jobConf);
 
     // JobClient uploads the jobConf to the file system.
-    File jobConfFile = uploadJobConf(jobConf);
+    jobConfFile = uploadJobConf(jobConf);
 
     // create jobTokens file
     uploadJobTokensFile();
+    
+    taskTrackerUGI = UserGroupInformation.getCurrentUser();
+    startTracker();
+
+    // Set up the task to be localized
+    taskId =
+      new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
+    createTask();
+    // mimic register task
+    // create the tip
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+  }
 
+  private void startTracker() throws IOException {
     // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
-    tracker.setIndexCache(new IndexCache(trackerFConf));
     tracker.setUserLogManager(new UtilsForTests.InLineUserLogManager(
         trackerFConf));
+    initializeTracker();
+  }
+  
+  private void initializeTracker() throws IOException {
+    tracker.setIndexCache(new IndexCache(trackerFConf));
     tracker.setTaskMemoryManagerEnabledFlag();
 
     // for test case system FS is the local FS
-
     tracker.systemFS = FileSystem.getLocal(trackerFConf);
     tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
     tracker.setLocalFileSystem(tracker.systemFS);
     
-    taskTrackerUGI = UserGroupInformation.getCurrentUser();
-
-    // Set up the task to be localized
-    taskId =
-        new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
-    task =
-        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
-    task.setConf(jobConf); // Set conf. Set user name in particular.
-    task.setUser(UserGroupInformation.getCurrentUser().getUserName());
-
+    tracker.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    tracker.runningJobs = new TreeMap<JobID, RunningJob>();
+    trackerFConf.deleteLocalFiles(TaskTracker.SUBDIR);
 
-    taskController = new DefaultTaskController();
+    // setup task controller
+    taskController = getTaskController();
     taskController.setConf(trackerFConf);
     taskController.setup();
-
     tracker.setTaskController(taskController);
     tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
                                        taskController));
+  }
 
-    // mimic register task
-    // create the tip
-    tip = tracker.new TaskInProgress(task, trackerFConf);
+  protected TaskController getTaskController() {
+    return new DefaultTaskController();
+  }
+
+  private void createTask()
+      throws IOException {
+    task = new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+    task.setConf(jobConf); // Set conf. Set user name in particular.
+    task.setUser(jobConf.getUser());
   }
 
+  protected UserGroupInformation getJobOwner() throws IOException {
+    return UserGroupInformation.getCurrentUser();
+  }
+  
   /**
    * @param jobConf
    * @throws IOException
@@ -454,63 +481,7 @@ public class TestTaskTrackerLocalization
     }
     TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
     localizedJobConf = rjob.getJobConf();
-
-    tip.setJobConf(localizedJobConf);
-
-    // ////////// The central method being tested
-    tip.localizeTask(task);
-    // //////////
-
-    // check the functionality of localizeTask
-    for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
-      File attemptDir =
-          new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
-              .toString(), taskId.toString()));
-      assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
-          + " is not created!!", attemptDir.exists());
-    }
-
-    attemptWorkDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
-            task.getUser(), task.getJobID().toString(), task.getTaskID()
-                .toString(), task.isTaskCleanupTask()), trackerFConf);
-    assertTrue("atttempt work dir for " + taskId.toString()
-        + " is not created in any of the configured dirs!!",
-        attemptWorkDir != null);
-
-    TaskRunner runner = task.createRunner(tracker, tip);
-
-    // /////// Few more methods being tested
-    runner.setupChildTaskConfiguration(lDirAlloc);
-    TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
-        localizedJobConf);
-    attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
-
-    // Make sure the task-conf file is created
-    Path localTaskFile =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
-            .getUser(), task.getJobID().toString(), task.getTaskID()
-            .toString(), task.isTaskCleanupTask()), trackerFConf);
-    assertTrue("Task conf file " + localTaskFile.toString()
-        + " is not created!!", new File(localTaskFile.toUri().getPath())
-        .exists());
-
-    // /////// One more method being tested. This happens in child space.
-    localizedTaskConf = new JobConf(localTaskFile);
-    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
-    // ///////
-
-    // Initialize task via TaskController
-    TaskControllerContext taskContext =
-        new TaskController.TaskControllerContext();
-    taskContext.env =
-        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
-            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
-    taskContext.task = task;
-    // /////////// The method being tested
-    taskController.initializeTask(taskContext);
-    // ///////////
-
+    initializeTask();
     checkTaskLocalization();
   }
 
@@ -521,13 +492,14 @@ public class TestTaskTrackerLocalization
         .getStrings("mapred.local.dir")) {
       assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
           childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
-              .getUser(), jobId.toString(), taskId.toString(), false)));
+              .getUser(), jobId.toString(), taskId.toString(),
+              task.isTaskCleanupTask())));
     }
 
     // Make sure task task.getJobFile is changed and pointed correctly.
     assertTrue(task.getJobFile().endsWith(
         TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
-            .toString(), false)));
+            .toString(), task.isTaskCleanupTask())));
 
     // Make sure that the tmp directories are created
     assertTrue("tmp dir is not created in workDir "
@@ -684,36 +656,54 @@ public class TestTaskTrackerLocalization
     testTaskCleanup(false, true);// no needCleanup; jvmReuse
   }
 
-  /**
-   * Validates if task cleanup is done properly
-   */
-  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
-      throws Exception {
-    // Localize job and localize task.
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
-    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
-    localizedJobConf = rjob.getJobConf();
-    if (jvmReuse) {
-      localizedJobConf.setNumTasksToExecutePerJvm(2);
-    }
+  private void initializeTask() throws IOException {
     tip.setJobConf(localizedJobConf);
+
+    // ////////// The central method being tested
     tip.localizeTask(task);
-    Path workDir =
+    // //////////
+
+    // check the functionality of localizeTask
+    for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
+      File attemptDir =
+          new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
+              .toString(), taskId.toString(), task.isTaskCleanupTask()));
+      assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
+          + " is not created!!", attemptDir.exists());
+    }
+
+    attemptWorkDir =
         lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
             task.getUser(), task.getJobID().toString(), task.getTaskID()
                 .toString(), task.isTaskCleanupTask()), trackerFConf);
+    assertTrue("atttempt work dir for " + taskId.toString()
+        + " is not created in any of the configured dirs!!",
+        attemptWorkDir != null);
+
     TaskRunner runner = task.createRunner(tracker, tip);
     tip.setTaskRunner(runner);
+
+    // /////// Few more methods being tested
     runner.setupChildTaskConfiguration(lDirAlloc);
-    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+    TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
         localizedJobConf);
-    runner.prepareLogFiles(task.getTaskID());
+    attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
+
+    // Make sure the task-conf file is created
     Path localTaskFile =
         lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
             .getUser(), task.getJobID().toString(), task.getTaskID()
             .toString(), task.isTaskCleanupTask()), trackerFConf);
-    JobConf localizedTaskConf = new JobConf(localTaskFile);
+    assertTrue("Task conf file " + localTaskFile.toString()
+        + " is not created!!", new File(localTaskFile.toUri().getPath())
+        .exists());
+
+    // /////// One more method being tested. This happens in child space.
+    localizedTaskConf = new JobConf(localTaskFile);
     TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+    // ///////
+
+    // Initialize task via TaskController
     TaskControllerContext taskContext =
         new TaskController.TaskControllerContext();
     taskContext.env =
@@ -722,6 +712,21 @@ public class TestTaskTrackerLocalization
     taskContext.task = task;
     // /////////// The method being tested
     taskController.initializeTask(taskContext);
+    // ///////////
+  }
+
+  /**
+   * Validates if task cleanup is done properly
+   */
+  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+      throws Exception {
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    if (jvmReuse) {
+      localizedJobConf.setNumTasksToExecutePerJvm(2);
+    }
+    initializeTask();
 
     // TODO: Let the task run and create files.
 
@@ -792,7 +797,6 @@ public class TestTaskTrackerLocalization
     }
     
     // Initialize task dirs
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
     tip.localizeTask(task);
     
@@ -835,4 +839,87 @@ public class TestTaskTrackerLocalization
     assertFalse("Job " + task.getJobID() + " work dir exists after cleanup", 
                 jWorkDirExists);
   }
+  
+  /**
+   * Tests TaskTracker restart after the localization.
+   * 
+   * This tests the following steps:
+   * 
+   * Localize Job, initialize a task.
+   * Then restart the Tracker.
+   * launch a cleanup attempt for the task.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void testTrackerRestart() throws IOException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    initializeTask();
+    
+    // imitate tracker restart
+    startTracker();
+    
+    // create a task cleanup attempt
+    createTask();
+    task.setTaskCleanupTask();
+    // register task
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+
+    // localize the job again.
+    rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    checkJobLocalization();
+    
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+  }
+  
+  /**
+   * Tests TaskTracker re-init after the localization.
+   * 
+   * This tests the following steps:
+   * 
+   * Localize Job, initialize a task.
+   * Then reinit the Tracker.
+   * launch a cleanup attempt for the task.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void testTrackerReinit() throws IOException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+
+    // Localize job and localize task.
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    initializeTask();
+    
+    // imitate tracker reinit
+    initializeTracker();
+    
+    // create a task cleanup attempt
+    createTask();
+    task.setTaskCleanupTask();
+    // register task
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+
+    // localize the job again.
+    rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    checkJobLocalization();
+    
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+  }
+
 }