You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/09/03 00:11:16 UTC
svn commit: r1164742 - in /hadoop/common/branches/branch-0.20-security: ./
src/core/org/apache/hadoop/fs/ src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/
Author: omalley
Date: Fri Sep 2 22:11:15 2011
New Revision: 1164742
URL: http://svn.apache.org/viewvc?rev=1164742&view=rev
Log:
MAPREDUCE-2804. Fixed a race condition in setting up the log directories
for tasks that are starting at the same time. (omalley)
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep 2 22:11:15 2011
@@ -140,6 +140,9 @@ Release 0.20.204.0 - 2011-8-25
BUG FIXES
+ MAPREDUCE-2804. Fixed a race condition in setting up the log directories
+ for tasks that are starting at the same time. (omalley)
+
MAPREDUCE-2846. Fixed a race condition in writing the log index file that
caused tasks to fail. (omalley)
Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java Fri Sep 2 22:11:15 2011
@@ -25,7 +25,10 @@ import java.util.zip.ZipFile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -561,9 +564,25 @@ public class FileUtil {
} catch(InterruptedException e){
//do nothing as of yet
}
+ if (returnVal != 0) {
+ LOG.warn("Command '" + cmd + "' failed " + returnVal +
+ " with: " + copyStderr(p));
+ }
return returnVal;
}
+ private static String copyStderr(Process p) throws IOException {
+ InputStream err = p.getErrorStream();
+ StringBuilder result = new StringBuilder();
+ byte[] buff = new byte[4096];
+ int len = err.read(buff);
+ while (len > 0) {
+ result.append(new String(buff, 0 , len));
+ len = err.read(buff);
+ }
+ return result.toString();
+ }
+
/**
* Change the permissions on a filename.
* @param filename the name of the file to change
@@ -608,6 +627,82 @@ public class FileUtil {
}
return shExec.getExitCode();
}
+
+ /**
+ * Set permissions to the required value. Uses the java primitives instead
+ * of forking if group == other.
+ * @param f the file to change
+ * @param permission the new permissions
+ * @throws IOException
+ */
+ public static void setPermission(File f, FsPermission permission
+ ) throws IOException {
+ FsAction user = permission.getUserAction();
+ FsAction group = permission.getGroupAction();
+ FsAction other = permission.getOtherAction();
+
+ // use the native/fork if the group/other permissions are different
+ // or if the native is available
+ if (group != other || NativeIO.isAvailable()) {
+ execSetPermission(f, permission);
+ return;
+ }
+
+ boolean rv = true;
+
+ // read perms
+ rv = f.setReadable(group.implies(FsAction.READ), false);
+ checkReturnValue(rv, f, permission);
+ if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
+ f.setReadable(user.implies(FsAction.READ), true);
+ checkReturnValue(rv, f, permission);
+ }
+
+ // write perms
+ rv = f.setWritable(group.implies(FsAction.WRITE), false);
+ checkReturnValue(rv, f, permission);
+ if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
+ f.setWritable(user.implies(FsAction.WRITE), true);
+ checkReturnValue(rv, f, permission);
+ }
+
+ // exec perms
+ rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
+ checkReturnValue(rv, f, permission);
+ if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
+ f.setExecutable(user.implies(FsAction.EXECUTE), true);
+ checkReturnValue(rv, f, permission);
+ }
+ }
+
+ private static void checkReturnValue(boolean rv, File p,
+ FsPermission permission
+ ) throws IOException {
+ if (!rv) {
+ throw new IOException("Failed to set permissions of path: " + p +
+ " to " +
+ String.format("%04o", permission.toShort()));
+ }
+ }
+
+ private static void execSetPermission(File f,
+ FsPermission permission
+ ) throws IOException {
+ if (NativeIO.isAvailable()) {
+ NativeIO.chmod(f.getCanonicalPath(), permission.toShort());
+ } else {
+ execCommand(f, Shell.SET_PERMISSION_COMMAND,
+ String.format("%04o", permission.toShort()));
+ }
+ }
+
+ static String execCommand(File f, String... cmd) throws IOException {
+ String[] args = new String[cmd.length + 1];
+ System.arraycopy(cmd, 0, args, 0, cmd.length);
+ args[cmd.length] = f.getCanonicalPath();
+ String output = Shell.execCommand(args);
+ return output;
+ }
/**
* Create a tmp file for a base file.
Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java Fri Sep 2 22:11:15 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
@@ -259,6 +258,7 @@ public class RawLocalFileSystem extends
if (pathToFile(src).renameTo(pathToFile(dst))) {
return true;
}
+ LOG.debug("Falling through to a copy of " + src + " to " + dst);
return FileUtil.copy(this, src, this, dst, true, getConf());
}
@@ -416,8 +416,8 @@ public class RawLocalFileSystem extends
IOException e = null;
try {
StringTokenizer t = new StringTokenizer(
- execCommand(new File(getPath().toUri()),
- Shell.getGET_PERMISSION_COMMAND()));
+ FileUtil.execCommand(new File(getPath().toUri()),
+ Shell.getGET_PERMISSION_COMMAND()));
//expected format
//-rw------- 1 username groupname ...
String permission = t.nextToken();
@@ -467,11 +467,11 @@ public class RawLocalFileSystem extends
}
if (username == null) {
- execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
+ FileUtil.execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
} else {
//OWNER[:[GROUP]]
String s = username + (groupname == null? "": ":" + groupname);
- execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
+ FileUtil.execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
}
}
@@ -480,70 +480,7 @@ public class RawLocalFileSystem extends
*/
@Override
public void setPermission(Path p, FsPermission permission
- ) throws IOException {
- FsAction user = permission.getUserAction();
- FsAction group = permission.getGroupAction();
- FsAction other = permission.getOtherAction();
-
- File f = pathToFile(p);
-
- // Fork chmod if group and other permissions are different...
- if (group != other) {
- execSetPermission(f, permission);
- return;
- }
-
- boolean rv = true;
-
- // read perms
- rv = f.setReadable(group.implies(FsAction.READ), false);
- checkReturnValue(rv, p, permission);
- if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
- f.setReadable(user.implies(FsAction.READ), true);
- checkReturnValue(rv, p, permission);
- }
-
- // write perms
- rv = f.setWritable(group.implies(FsAction.WRITE), false);
- checkReturnValue(rv, p, permission);
- if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
- f.setWritable(user.implies(FsAction.WRITE), true);
- checkReturnValue(rv, p, permission);
- }
-
- // exec perms
- rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
- checkReturnValue(rv, p, permission);
- if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
- f.setExecutable(user.implies(FsAction.EXECUTE), true);
- checkReturnValue(rv, p, permission);
- }
- }
-
- private void checkReturnValue(boolean rv, Path p, FsPermission permission)
- throws IOException {
- if (!rv) {
- throw new IOException("Failed to set permissions of path: " + p + " to " +
- String.format("%04o", permission.toShort()));
- }
- }
-
- private void execSetPermission(File f, FsPermission permission)
- throws IOException {
- if (NativeIO.isAvailable()) {
- NativeIO.chmod(f.getCanonicalPath(),
- permission.toShort());
- } else {
- execCommand(f, Shell.SET_PERMISSION_COMMAND,
- String.format("%04o", permission.toShort()));
- }
- }
-
- private static String execCommand(File f, String... cmd) throws IOException {
- String[] args = new String[cmd.length + 1];
- System.arraycopy(cmd, 0, args, 0, cmd.length);
- args[cmd.length] = f.getCanonicalPath();
- String output = Shell.execCommand(args);
- return output;
+ ) throws IOException {
+ FileUtil.setPermission(pathToFile(p), permission);
}
}
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Sep 2 22:11:15 2011
@@ -68,12 +68,8 @@ public class DefaultTaskController exten
@Override
public void createLogDir(TaskAttemptID taskID,
boolean isCleanup) throws IOException {
- boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup,
- localStorage.getGoodLocalDirs());
- if (!b) {
- LOG.warn("Creation of attempt log dir for " + taskID
- + " failed. Ignoring");
- }
+ TaskLog.createTaskAttemptLogDir(taskID, isCleanup,
+ localStorage.getGoodLocalDirs());
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Sep 2 22:11:15 2011
@@ -23,7 +23,6 @@ import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -50,7 +49,6 @@ import org.apache.hadoop.mapreduce.JobID
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -92,48 +90,29 @@ public class TaskLog {
* @param taskID attempt-id for which log dir is to be created
* @param isCleanup Is this attempt a cleanup attempt ?
* @param localDirs mapred local directories
- * @return true if attempt log directory creation is succeeded
* @throws IOException
*/
- public static boolean createTaskAttemptLogDir(TaskAttemptID taskID,
+ public static void createTaskAttemptLogDir(TaskAttemptID taskID,
boolean isCleanup, String[] localDirs) throws IOException{
String cleanupSuffix = isCleanup ? ".cleanup" : "";
String strAttemptLogDir = getTaskAttemptLogDir(taskID,
cleanupSuffix, localDirs);
File attemptLogDir = new File(strAttemptLogDir);
- boolean isSucceeded = attemptLogDir.mkdirs();
- if(isSucceeded) {
- String strLinkAttemptLogDir = getJobDir(
- taskID.getJobID()).getAbsolutePath() + File.separatorChar +
- taskID.toString() + cleanupSuffix;
- if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
- LOG.warn("Creation of symlink to attempt log dir failed.");
- isSucceeded = false;
- }
-
- File linkAttemptLogDir = new File(strLinkAttemptLogDir);
- if (!linkAttemptLogDir.isDirectory() && !linkAttemptLogDir.mkdirs()) {
- LOG.warn("Unable to create linkAttemptLogDir directory : "
- + linkAttemptLogDir.getPath());
- isSucceeded = false;
- }
-
- FileSystem localFS = FileSystem.getLocal(new Configuration());
-
- //Set permissions for target attempt log dir
- localFS.setPermission(new Path(attemptLogDir.getCanonicalPath()),
- new FsPermission((short)0700));
-
- //Set permissions for target job log dir
- localFS.setPermission(new Path(attemptLogDir.getParentFile().getCanonicalPath()),
- new FsPermission((short)0700));
-
- // Set permissions for job dir in userlogs
- localFS.setPermission(
- new Path(linkAttemptLogDir.getParentFile().getCanonicalPath()),
- new FsPermission((short)0700));
+ if (!attemptLogDir.mkdirs()) {
+ throw new IOException("Creation of " + attemptLogDir + " failed.");
}
- return isSucceeded;
+ String strLinkAttemptLogDir =
+ getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar +
+ taskID.toString() + cleanupSuffix;
+ if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
+ throw new IOException("Creation of symlink from " +
+ strLinkAttemptLogDir + " to " + strAttemptLogDir +
+ " failed.");
+ }
+
+ //Set permissions for target attempt log dir
+ FsPermission userOnly = new FsPermission((short) 0700);
+ FileUtil.setPermission(attemptLogDir, userOnly);
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Sep 2 22:11:15 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapreduce.JobID;
/**
*
@@ -112,15 +111,17 @@ public class Localizer {
if (fs.exists(userDir) || fs.mkdirs(userDir)) {
// Set permissions on the user-directory
- fs.setPermission(userDir, new FsPermission((short)0700));
+ FsPermission userOnly = new FsPermission((short) 0700);
+ FileUtil.setPermission(new File(userDir.toUri().getPath()),
+ userOnly);
userDirStatus = true;
// Set up the jobcache directory
- Path jobCacheDir =
- new Path(localDir, TaskTracker.getJobCacheSubdir(user));
- if (fs.exists(jobCacheDir) || fs.mkdirs(jobCacheDir)) {
+ File jobCacheDir =
+ new File(localDir, TaskTracker.getJobCacheSubdir(user));
+ if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
// Set permissions on the jobcache-directory
- fs.setPermission(jobCacheDir, new FsPermission((short)0700));
+ FileUtil.setPermission(jobCacheDir, userOnly);
jobCacheDirStatus = true;
} else {
LOG.warn("Unable to create job cache directory : "
@@ -128,11 +129,12 @@ public class Localizer {
}
// Set up the cache directory used for distributed cache files
- Path distributedCacheDir =
- new Path(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
- if (fs.exists(distributedCacheDir) || fs.mkdirs(distributedCacheDir)) {
+ File distributedCacheDir =
+ new File(localDir,
+ TaskTracker.getPrivateDistributedCacheDir(user));
+ if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
// Set permissions on the distcache-directory
- fs.setPermission(distributedCacheDir, new FsPermission((short)0700));
+ FileUtil.setPermission(distributedCacheDir, userOnly);
distributedCacheDirStatus = true;
} else {
LOG.warn("Unable to create distributed-cache directory : "
@@ -164,51 +166,6 @@ public class Localizer {
}
/**
- * Prepare the job directories for a given job. To be called by the job
- * localization code, only if the job is not already localized.
- *
- * <br>
- * Here, we set 700 permissions on the job directories created on all disks.
- * This we do so as to avoid any misuse by other users till the time
- * {@link TaskController#initializeJob} is run at a
- * later time to set proper private permissions on the job directories. <br>
- *
- * @param user
- * @param jobId
- * @throws IOException
- */
- public void initializeJobDirs(String user, JobID jobId)
- throws IOException {
- boolean initJobDirStatus = false;
- String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
- for (String localDir : localDirs) {
- Path jobDir = new Path(localDir, jobDirPath);
- if (fs.exists(jobDir)) {
- // this will happen on a partial execution of localizeJob. Sometimes
- // copying job.xml to the local disk succeeds but copying job.jar might
- // throw out an exception. We should clean up and then try again.
- fs.delete(jobDir, true);
- }
-
- boolean jobDirStatus = fs.mkdirs(jobDir);
- if (!jobDirStatus) {
- LOG.warn("Not able to create job directory " + jobDir.toString());
- }
-
- initJobDirStatus = initJobDirStatus || jobDirStatus;
-
- // job-dir has to be private to the TT
- fs.setPermission(jobDir, new FsPermission((short)0700));
- }
-
- if (!initJobDirStatus) {
- throw new IOException("Not able to initialize job directories "
- + "in any of the configured local directories for job "
- + jobId.toString());
- }
- }
-
- /**
* Create taskDirs on all the disks. Otherwise, in some cases, like when
* LinuxTaskController is in use, child might wish to balance load across
* disks but cannot itself create attempt directory because of the fact that
@@ -244,18 +201,4 @@ public class Localizer {
+ attemptId);
}
}
-
- /**
- * Create job log directory and set appropriate permissions for the directory.
- *
- * @param jobId
- */
- public void initializeJobLogDir(JobID jobId) throws IOException {
- Path jobUserLogDir = new Path(TaskLog.getJobDir(jobId).getCanonicalPath());
- if (!fs.mkdirs(jobUserLogDir)) {
- throw new IOException("Could not create job user log directory: " +
- jobUserLogDir);
- }
- fs.setPermission(jobUserLogDir, new FsPermission((short)0700));
- }
}