You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:05:28 UTC
svn commit: r1077343 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
c++/task-controller/ test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 04:05:28 2011
New Revision: 1077343
URL: http://svn.apache.org/viewvc?rev=1077343&view=rev
Log:
commit 04196367a12b565f4e0854ad4c2275f7d69be39f
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date: Fri Mar 19 19:49:22 2010 +0530
MAPREDUCE:1609 from https://issues.apache.org/jira/secure/attachment/12439278/MAPREDUCE-1609-20-1.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1609. Fixes a problem with localization of job log
+ directories when tasktracker is re-initialized that can result
+ in failed tasks. (Amareshwari Sriramadasu via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077343&r1=1077342&r2=1077343&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar 4 04:05:28 2011
@@ -387,21 +387,8 @@ static int secure_path(const char *path,
continue;
}
- if (should_check_ownership &&
- (check_ownership(entry->fts_path, uid, gid) != 0)) {
- fprintf(LOGFILE,
- "Invalid file path. %s not user/group owned by the tasktracker.\n",
- entry->fts_path);
- error_code = -1;
- } else if (change_owner(entry->fts_path, uid, gid) != 0) {
- fprintf(LOGFILE, "couldn't change the ownership of %s\n",
- entry->fts_path);
- error_code = -3;
- } else if (change_mode(entry->fts_path, (dir ? dir_mode : file_mode)) != 0) {
- fprintf(LOGFILE, "couldn't change the permissions of %s\n",
- entry->fts_path);
- error_code = -3;
- }
+ error_code = secure_single_path(entry->fts_path, uid, gid,
+ (dir ? dir_mode : file_mode), should_check_ownership);
}
if (fts_close(tree) != 0) {
fprintf(LOGFILE, "couldn't close file traversal structure:%s.\n",
@@ -411,6 +398,28 @@ static int secure_path(const char *path,
}
/**
+ * Function to change ownership and permissions of the given path.
+ * This call sets ownership and permissions just for the path, not recursive.
+ */
+int secure_single_path(char *path, uid_t uid, gid_t gid,
+ mode_t perm, int should_check_ownership) {
+ int error_code = 0;
+ if (should_check_ownership &&
+ (check_ownership(path, uid, gid) != 0)) {
+ fprintf(LOGFILE,
+ "Invalid file path. %s not user/group owned by the tasktracker.\n", path);
+ error_code = -1;
+ } else if (change_owner(path, uid, gid) != 0) {
+ fprintf(LOGFILE, "couldn't change the ownership of %s\n", path);
+ error_code = -3;
+ } else if (change_mode(path, perm) != 0) {
+ fprintf(LOGFILE, "couldn't change the permissions of %s\n", path);
+ error_code = -3;
+ }
+ return error_code;
+}
+
+/**
* Function to prepare the attempt directories for the task JVM.
* This is done by changing the ownership of the attempt directory recursively
* to the job owner. We do the following:
@@ -541,8 +550,10 @@ int prepare_job_logs(const char *log_dir
}
gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
- if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
- permissions, S_ISGID | permissions, 1) != 0) {
+ // job log directory should not be set permissions recursively
+ // because, on tt restart/reinit, it would contain directories of earlier run
+ if (secure_single_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+ S_ISGID | permissions, 1) != 0) {
fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
free(job_log_dir);
return -1;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077343&r1=1077342&r2=1077343&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 04:05:28 2011
@@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -58,27 +57,6 @@ public class TestLocalizationWithLinuxTa
super.setUp();
- taskController = new MyLinuxTaskController();
- String path =
- System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
- String execPath = path + "/task-controller";
- ((MyLinuxTaskController) taskController).setTaskControllerExe(execPath);
- taskController.setConf(trackerFConf);
- taskController.setup();
-
- tracker.setTaskController(taskController);
- tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
- taskController));
-
- // Rewrite conf so as to reflect task's correct user name.
- String ugi =
- System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
- JobConf jobConf = new JobConf(task.getConf());
- String user = ugi.split(",")[0];
- jobConf.setUser(user);
- File jobConfFile = uploadJobConf(jobConf);
- task.setConf(jobConf);
- task.setUser(user);
taskTrackerUserName = UserGroupInformation.getLoginUser()
.getShortUserName();
}
@@ -95,6 +73,18 @@ public class TestLocalizationWithLinuxTa
}
}
+ protected TaskController getTaskController() {
+ return new MyLinuxTaskController();
+ }
+
+ protected UserGroupInformation getJobOwner() {
+ String ugi = System
+ .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ String[] splits = ugi.split(",");
+ return UserGroupInformation.createUserForTesting(splits[0],
+ new String[] { splits[1] });
+ }
+
/** @InheritDoc */
@Override
public void testTaskControllerSetup() {
@@ -218,7 +208,8 @@ public class TestLocalizationWithLinuxTa
// check the private permissions of various directories
List<Path> dirs = new ArrayList<Path>();
dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
- .getUser(), jobId.toString(), taskId.toString()), trackerFConf));
+ .getUser(), jobId.toString(), taskId.toString(),
+ task.isTaskCleanupTask()), trackerFConf));
dirs.add(attemptWorkDir);
dirs.add(new Path(attemptWorkDir, "tmp"));
dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077343&r1=1077342&r2=1077343&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 04:05:28 2011
@@ -21,6 +21,8 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.TreeMap;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
import org.apache.hadoop.mapreduce.JobContext;
@@ -75,6 +78,8 @@ public class TestTaskTrackerLocalization
protected File[] attemptLogFiles;
protected JobConf localizedTaskConf;
private TaskInProgress tip;
+ private JobConf jobConf;
+ private File jobConfFile;
/**
* Dummy method in this base class. Only derived classes will define this
@@ -113,12 +118,14 @@ public class TestTaskTrackerLocalization
trackerFConf.setStrings("mapred.local.dir", localDirs);
// Create the job configuration file. Same as trackerConf in this test.
- JobConf jobConf = new JobConf(trackerFConf);
+ jobConf = new JobConf(trackerFConf);
// Set job view ACLs in conf sothat validation of contents of jobACLsFile
// can be done against this value. Have both users and groups
String jobViewACLs = "user1,user2, group1,group2";
jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
jobConf.setInt("mapred.userlog.retain.hours", 0);
+ jobConf.setUser(getJobOwner().getShortUserName());
+
String jtIdentifier = "200907202331";
jobId = new JobID(jtIdentifier, 1);
@@ -127,49 +134,69 @@ public class TestTaskTrackerLocalization
uploadJobJar(jobConf);
// JobClient uploads the jobConf to the file system.
- File jobConfFile = uploadJobConf(jobConf);
+ jobConfFile = uploadJobConf(jobConf);
// create jobTokens file
uploadJobTokensFile();
+
+ taskTrackerUGI = UserGroupInformation.getCurrentUser();
+ startTracker();
+
+ // Set up the task to be localized
+ taskId =
+ new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
+ createTask();
+ // mimic register task
+ // create the tip
+ tip = tracker.new TaskInProgress(task, trackerFConf);
+ }
+ private void startTracker() throws IOException {
// Set up the TaskTracker
tracker = new TaskTracker();
tracker.setConf(trackerFConf);
- tracker.setIndexCache(new IndexCache(trackerFConf));
tracker.setUserLogManager(new UtilsForTests.InLineUserLogManager(
trackerFConf));
+ initializeTracker();
+ }
+
+ private void initializeTracker() throws IOException {
+ tracker.setIndexCache(new IndexCache(trackerFConf));
tracker.setTaskMemoryManagerEnabledFlag();
// for test case system FS is the local FS
-
tracker.systemFS = FileSystem.getLocal(trackerFConf);
tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
tracker.setLocalFileSystem(tracker.systemFS);
- taskTrackerUGI = UserGroupInformation.getCurrentUser();
-
- // Set up the task to be localized
- taskId =
- new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
- task =
- new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
- task.setConf(jobConf); // Set conf. Set user name in particular.
- task.setUser(UserGroupInformation.getCurrentUser().getUserName());
-
+ tracker.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+ tracker.runningJobs = new TreeMap<JobID, RunningJob>();
+ trackerFConf.deleteLocalFiles(TaskTracker.SUBDIR);
- taskController = new DefaultTaskController();
+ // setup task controller
+ taskController = getTaskController();
taskController.setConf(trackerFConf);
taskController.setup();
-
tracker.setTaskController(taskController);
tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
taskController));
+ }
- // mimic register task
- // create the tip
- tip = tracker.new TaskInProgress(task, trackerFConf);
+ protected TaskController getTaskController() {
+ return new DefaultTaskController();
+ }
+
+ private void createTask()
+ throws IOException {
+ task = new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+ task.setConf(jobConf); // Set conf. Set user name in particular.
+ task.setUser(jobConf.getUser());
}
+ protected UserGroupInformation getJobOwner() throws IOException {
+ return UserGroupInformation.getCurrentUser();
+ }
+
/**
* @param jobConf
* @throws IOException
@@ -454,63 +481,7 @@ public class TestTaskTrackerLocalization
}
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
localizedJobConf = rjob.getJobConf();
-
- tip.setJobConf(localizedJobConf);
-
- // ////////// The central method being tested
- tip.localizeTask(task);
- // //////////
-
- // check the functionality of localizeTask
- for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
- File attemptDir =
- new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
- .toString(), taskId.toString()));
- assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
- + " is not created!!", attemptDir.exists());
- }
-
- attemptWorkDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
- task.getUser(), task.getJobID().toString(), task.getTaskID()
- .toString(), task.isTaskCleanupTask()), trackerFConf);
- assertTrue("atttempt work dir for " + taskId.toString()
- + " is not created in any of the configured dirs!!",
- attemptWorkDir != null);
-
- TaskRunner runner = task.createRunner(tracker, tip);
-
- // /////// Few more methods being tested
- runner.setupChildTaskConfiguration(lDirAlloc);
- TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
- localizedJobConf);
- attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
-
- // Make sure the task-conf file is created
- Path localTaskFile =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
- .getUser(), task.getJobID().toString(), task.getTaskID()
- .toString(), task.isTaskCleanupTask()), trackerFConf);
- assertTrue("Task conf file " + localTaskFile.toString()
- + " is not created!!", new File(localTaskFile.toUri().getPath())
- .exists());
-
- // /////// One more method being tested. This happens in child space.
- localizedTaskConf = new JobConf(localTaskFile);
- TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
- // ///////
-
- // Initialize task via TaskController
- TaskControllerContext taskContext =
- new TaskController.TaskControllerContext();
- taskContext.env =
- new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
- .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
- taskContext.task = task;
- // /////////// The method being tested
- taskController.initializeTask(taskContext);
- // ///////////
-
+ initializeTask();
checkTaskLocalization();
}
@@ -521,13 +492,14 @@ public class TestTaskTrackerLocalization
.getStrings("mapred.local.dir")) {
assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
- .getUser(), jobId.toString(), taskId.toString(), false)));
+ .getUser(), jobId.toString(), taskId.toString(),
+ task.isTaskCleanupTask())));
}
// Make sure task task.getJobFile is changed and pointed correctly.
assertTrue(task.getJobFile().endsWith(
TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
- .toString(), false)));
+ .toString(), task.isTaskCleanupTask())));
// Make sure that the tmp directories are created
assertTrue("tmp dir is not created in workDir "
@@ -684,36 +656,54 @@ public class TestTaskTrackerLocalization
testTaskCleanup(false, true);// no needCleanup; jvmReuse
}
- /**
- * Validates if task cleanup is done properly
- */
- private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
- throws Exception {
- // Localize job and localize task.
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
- TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
- localizedJobConf = rjob.getJobConf();
- if (jvmReuse) {
- localizedJobConf.setNumTasksToExecutePerJvm(2);
- }
+ private void initializeTask() throws IOException {
tip.setJobConf(localizedJobConf);
+
+ // ////////// The central method being tested
tip.localizeTask(task);
- Path workDir =
+ // //////////
+
+ // check the functionality of localizeTask
+ for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
+ File attemptDir =
+ new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
+ .toString(), taskId.toString(), task.isTaskCleanupTask()));
+ assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
+ + " is not created!!", attemptDir.exists());
+ }
+
+ attemptWorkDir =
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
task.getUser(), task.getJobID().toString(), task.getTaskID()
.toString(), task.isTaskCleanupTask()), trackerFConf);
+ assertTrue("atttempt work dir for " + taskId.toString()
+ + " is not created in any of the configured dirs!!",
+ attemptWorkDir != null);
+
TaskRunner runner = task.createRunner(tracker, tip);
tip.setTaskRunner(runner);
+
+ // /////// Few more methods being tested
runner.setupChildTaskConfiguration(lDirAlloc);
- TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
localizedJobConf);
- runner.prepareLogFiles(task.getTaskID());
+ attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
+
+ // Make sure the task-conf file is created
Path localTaskFile =
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
.getUser(), task.getJobID().toString(), task.getTaskID()
.toString(), task.isTaskCleanupTask()), trackerFConf);
- JobConf localizedTaskConf = new JobConf(localTaskFile);
+ assertTrue("Task conf file " + localTaskFile.toString()
+ + " is not created!!", new File(localTaskFile.toUri().getPath())
+ .exists());
+
+ // /////// One more method being tested. This happens in child space.
+ localizedTaskConf = new JobConf(localTaskFile);
TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+ // ///////
+
+ // Initialize task via TaskController
TaskControllerContext taskContext =
new TaskController.TaskControllerContext();
taskContext.env =
@@ -722,6 +712,21 @@ public class TestTaskTrackerLocalization
taskContext.task = task;
// /////////// The method being tested
taskController.initializeTask(taskContext);
+ // ///////////
+ }
+
+ /**
+ * Validates if task cleanup is done properly
+ */
+ private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+ throws Exception {
+ // Localize job and localize task.
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
+ if (jvmReuse) {
+ localizedJobConf.setNumTasksToExecutePerJvm(2);
+ }
+ initializeTask();
// TODO: Let the task run and create files.
@@ -792,7 +797,6 @@ public class TestTaskTrackerLocalization
}
// Initialize task dirs
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
tip.setJobConf(localizedJobConf);
tip.localizeTask(task);
@@ -835,4 +839,87 @@ public class TestTaskTrackerLocalization
assertFalse("Job " + task.getJobID() + " work dir exists after cleanup",
jWorkDirExists);
}
+
+ /**
+ * Tests TaskTracker restart after the localization.
+ *
+ * This tests the following steps:
+ *
+ * Localize Job, initialize a task.
+ * Then restart the Tracker.
+ * launch a cleanup attempt for the task.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testTrackerRestart() throws IOException, InterruptedException {
+ if (!canRun()) {
+ return;
+ }
+
+ // Localize job and localize task.
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
+ initializeTask();
+
+ // imitate tracker restart
+ startTracker();
+
+ // create a task cleanup attempt
+ createTask();
+ task.setTaskCleanupTask();
+ // register task
+ tip = tracker.new TaskInProgress(task, trackerFConf);
+
+ // localize the job again.
+ rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
+ checkJobLocalization();
+
+ // localize task cleanup attempt
+ initializeTask();
+ checkTaskLocalization();
+ }
+
+ /**
+ * Tests TaskTracker re-init after the localization.
+ *
+ * This tests the following steps:
+ *
+ * Localize Job, initialize a task.
+ * Then reinit the Tracker.
+ * launch a cleanup attempt for the task.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void testTrackerReinit() throws IOException, InterruptedException {
+ if (!canRun()) {
+ return;
+ }
+
+ // Localize job and localize task.
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
+ initializeTask();
+
+ // imitate tracker reinit
+ initializeTracker();
+
+ // create a task cleanup attempt
+ createTask();
+ task.setTaskCleanupTask();
+ // register task
+ tip = tracker.new TaskInProgress(task, trackerFConf);
+
+ // localize the job again.
+ rjob = tracker.localizeJob(tip);
+ localizedJobConf = rjob.getJobConf();
+ checkJobLocalization();
+
+ // localize task cleanup attempt
+ initializeTask();
+ checkTaskLocalization();
+ }
+
}