You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/09/03 00:11:16 UTC

svn commit: r1164742 - in /hadoop/common/branches/branch-0.20-security: ./ src/core/org/apache/hadoop/fs/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/

Author: omalley
Date: Fri Sep  2 22:11:15 2011
New Revision: 1164742

URL: http://svn.apache.org/viewvc?rev=1164742&view=rev
Log:
MAPREDUCE-2804. Fixed a race condition in setting up the log directories
for tasks that are starting at the same time. (omalley)

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep  2 22:11:15 2011
@@ -140,6 +140,9 @@ Release 0.20.204.0 - 2011-8-25
 
   BUG FIXES
 
+    MAPREDUCE-2804. Fixed a race condition in setting up the log directories
+    for tasks that are starting at the same time. (omalley)
+
     MAPREDUCE-2846. Fixed a race condition in writing the log index file that
     caused tasks to fail. (omalley)
 

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java Fri Sep  2 22:11:15 2011
@@ -25,7 +25,10 @@ import java.util.zip.ZipFile;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -561,9 +564,25 @@ public class FileUtil {
     } catch(InterruptedException e){
       //do nothing as of yet
     }
+    if (returnVal != 0) {
+      LOG.warn("Command '" + cmd + "' failed " + returnVal + 
+               " with: " + copyStderr(p));
+    }
     return returnVal;
   }
   
+  private static String copyStderr(Process p) throws IOException {
+    InputStream err = p.getErrorStream();
+    StringBuilder result = new StringBuilder();
+    byte[] buff = new byte[4096];
+    int len = err.read(buff);
+    while (len > 0) {
+      result.append(new String(buff, 0 , len));
+      len = err.read(buff);
+    }
+    return result.toString();
+  }
+
   /**
    * Change the permissions on a filename.
    * @param filename the name of the file to change
@@ -608,6 +627,82 @@ public class FileUtil {
     }
     return shExec.getExitCode();
   }
+
+  /**
+   * Set permissions to the required value. Uses the java primitives instead
+   * of forking if group == other.
+   * @param f the file to change
+   * @param permission the new permissions
+   * @throws IOException
+   */
+  public static void setPermission(File f, FsPermission permission
+                                   ) throws IOException {
+    FsAction user = permission.getUserAction();
+    FsAction group = permission.getGroupAction();
+    FsAction other = permission.getOtherAction();
+
+    // use the native/fork if the group/other permissions are different
+    // or if the native is available    
+    if (group != other || NativeIO.isAvailable()) {
+      execSetPermission(f, permission);
+      return;
+    }
+    
+    boolean rv = true;
+    
+    // read perms
+    rv = f.setReadable(group.implies(FsAction.READ), false);
+    checkReturnValue(rv, f, permission);
+    if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
+      f.setReadable(user.implies(FsAction.READ), true);
+      checkReturnValue(rv, f, permission);
+    }
+
+    // write perms
+    rv = f.setWritable(group.implies(FsAction.WRITE), false);
+    checkReturnValue(rv, f, permission);
+    if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
+      f.setWritable(user.implies(FsAction.WRITE), true);
+      checkReturnValue(rv, f, permission);
+    }
+
+    // exec perms
+    rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
+    checkReturnValue(rv, f, permission);
+    if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
+      f.setExecutable(user.implies(FsAction.EXECUTE), true);
+      checkReturnValue(rv, f, permission);
+    }
+  }
+
+  private static void checkReturnValue(boolean rv, File p, 
+                                       FsPermission permission
+                                       ) throws IOException {
+    if (!rv) {
+      throw new IOException("Failed to set permissions of path: " + p + 
+                            " to " + 
+                            String.format("%04o", permission.toShort()));
+    }
+  }
+  
+  private static void execSetPermission(File f, 
+                                        FsPermission permission
+                                       )  throws IOException {
+    if (NativeIO.isAvailable()) {
+      NativeIO.chmod(f.getCanonicalPath(), permission.toShort());
+    } else {
+      execCommand(f, Shell.SET_PERMISSION_COMMAND,
+                  String.format("%04o", permission.toShort()));
+    }
+  }
+  
+  static String execCommand(File f, String... cmd) throws IOException {
+    String[] args = new String[cmd.length + 1];
+    System.arraycopy(cmd, 0, args, 0, cmd.length);
+    args[cmd.length] = f.getCanonicalPath();
+    String output = Shell.execCommand(args);
+    return output;
+  }
   
   /**
    * Create a tmp file for a base file.

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java Fri Sep  2 22:11:15 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.fs;
 import java.io.*;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -259,6 +258,7 @@ public class RawLocalFileSystem extends 
     if (pathToFile(src).renameTo(pathToFile(dst))) {
       return true;
     }
+    LOG.debug("Falling through to a copy of " + src + " to " + dst);
     return FileUtil.copy(this, src, this, dst, true, getConf());
   }
   
@@ -416,8 +416,8 @@ public class RawLocalFileSystem extends 
       IOException e = null;
       try {
         StringTokenizer t = new StringTokenizer(
-            execCommand(new File(getPath().toUri()), 
-                        Shell.getGET_PERMISSION_COMMAND()));
+            FileUtil.execCommand(new File(getPath().toUri()), 
+                                 Shell.getGET_PERMISSION_COMMAND()));
         //expected format
         //-rw-------    1 username groupname ...
         String permission = t.nextToken();
@@ -467,11 +467,11 @@ public class RawLocalFileSystem extends 
     }
 
     if (username == null) {
-      execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
+      FileUtil.execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
     } else {
       //OWNER[:[GROUP]]
       String s = username + (groupname == null? "": ":" + groupname);
-      execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
+      FileUtil.execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
     }
   }
 
@@ -480,70 +480,7 @@ public class RawLocalFileSystem extends 
    */
   @Override
   public void setPermission(Path p, FsPermission permission
-      ) throws IOException {
-    FsAction user = permission.getUserAction();
-    FsAction group = permission.getGroupAction();
-    FsAction other = permission.getOtherAction();
-    
-    File f = pathToFile(p);
-    
-    // Fork chmod if group and other permissions are different...
-    if (group != other) {
-      execSetPermission(f, permission);
-      return;
-    }
-    
-    boolean rv = true;
-    
-    // read perms
-    rv = f.setReadable(group.implies(FsAction.READ), false);
-    checkReturnValue(rv, p, permission);
-    if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) {
-      f.setReadable(user.implies(FsAction.READ), true);
-      checkReturnValue(rv, p, permission);
-    }
-
-    // write perms
-    rv = f.setWritable(group.implies(FsAction.WRITE), false);
-    checkReturnValue(rv, p, permission);
-    if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) {
-      f.setWritable(user.implies(FsAction.WRITE), true);
-      checkReturnValue(rv, p, permission);
-    }
-
-    // exec perms
-    rv = f.setExecutable(group.implies(FsAction.EXECUTE), false);
-    checkReturnValue(rv, p, permission);
-    if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) {
-      f.setExecutable(user.implies(FsAction.EXECUTE), true);
-      checkReturnValue(rv, p, permission);
-    }
-  }
-
-  private void checkReturnValue(boolean rv, Path p, FsPermission permission) 
-  throws IOException {
-    if (!rv) {
-      throw new IOException("Failed to set permissions of path: " + p + " to " + 
-                            String.format("%04o", permission.toShort()));
-    }
-  }
-  
-  private void execSetPermission(File f, FsPermission permission) 
-  throws IOException {
-    if (NativeIO.isAvailable()) {
-      NativeIO.chmod(f.getCanonicalPath(),
-      permission.toShort());
-    } else {
-      execCommand(f, Shell.SET_PERMISSION_COMMAND,
-          String.format("%04o", permission.toShort()));
-    }
-  }
-  
-  private static String execCommand(File f, String... cmd) throws IOException {
-    String[] args = new String[cmd.length + 1];
-    System.arraycopy(cmd, 0, args, 0, cmd.length);
-    args[cmd.length] = f.getCanonicalPath();
-    String output = Shell.execCommand(args);
-    return output;
+                            ) throws IOException {
+    FileUtil.setPermission(pathToFile(p), permission);
   }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Sep  2 22:11:15 2011
@@ -68,12 +68,8 @@ public class DefaultTaskController exten
   @Override
   public void createLogDir(TaskAttemptID taskID, 
 		  			boolean isCleanup) throws IOException {
-	boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup, 
-	    		 		localStorage.getGoodLocalDirs());
-	if (!b) {
-	    LOG.warn("Creation of attempt log dir for " + taskID
-	                 + " failed. Ignoring");
-	}
+    TaskLog.createTaskAttemptLogDir(taskID, isCleanup, 
+                                    localStorage.getGoodLocalDirs());
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Sep  2 22:11:15 2011
@@ -23,7 +23,6 @@ import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -50,7 +49,6 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -92,48 +90,29 @@ public class TaskLog {
    * @param taskID attempt-id for which log dir is to be created
    * @param isCleanup Is this attempt a cleanup attempt ?
    * @param localDirs mapred local directories
-   * @return true if attempt log directory creation is succeeded
    * @throws IOException
    */
-  public static boolean createTaskAttemptLogDir(TaskAttemptID taskID,
+  public static void createTaskAttemptLogDir(TaskAttemptID taskID,
       boolean isCleanup, String[] localDirs) throws IOException{
     String cleanupSuffix = isCleanup ? ".cleanup" : "";
     String strAttemptLogDir = getTaskAttemptLogDir(taskID, 
         cleanupSuffix, localDirs);
     File attemptLogDir = new File(strAttemptLogDir);
-    boolean isSucceeded = attemptLogDir.mkdirs();
-    if(isSucceeded) {
-      String strLinkAttemptLogDir = getJobDir(
-          taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
-          taskID.toString() + cleanupSuffix;
-      if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
-        LOG.warn("Creation of symlink to attempt log dir failed.");
-        isSucceeded = false;
-      } 
-
-      File linkAttemptLogDir = new File(strLinkAttemptLogDir);
-      if (!linkAttemptLogDir.isDirectory() && !linkAttemptLogDir.mkdirs()) {
-        LOG.warn("Unable to create linkAttemptLogDir directory : "
-                 + linkAttemptLogDir.getPath());
-        isSucceeded = false;
-      }
-
-      FileSystem localFS = FileSystem.getLocal(new Configuration());
-
-      //Set permissions for target attempt log dir 
-      localFS.setPermission(new Path(attemptLogDir.getCanonicalPath()),
-                            new FsPermission((short)0700));
-
-      //Set permissions for target job log dir
-      localFS.setPermission(new Path(attemptLogDir.getParentFile().getCanonicalPath()),
-                            new FsPermission((short)0700));
-
-      // Set permissions for job dir in userlogs
-      localFS.setPermission(
-          new Path(linkAttemptLogDir.getParentFile().getCanonicalPath()),
-          new FsPermission((short)0700));
+    if (!attemptLogDir.mkdirs()) {
+      throw new IOException("Creation of " + attemptLogDir + " failed.");
     }
-    return isSucceeded;
+    String strLinkAttemptLogDir = 
+        getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
+        taskID.toString() + cleanupSuffix;
+    if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
+      throw new IOException("Creation of symlink from " + 
+                            strLinkAttemptLogDir + " to " + strAttemptLogDir +
+                            " failed.");
+    }
+
+    //Set permissions for target attempt log dir 
+    FsPermission userOnly = new FsPermission((short) 0700);
+    FileUtil.setPermission(attemptLogDir, userOnly);
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1164742&r1=1164741&r2=1164742&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Sep  2 22:11:15 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapreduce.JobID;
 
 /**
  * 
@@ -112,15 +111,17 @@ public class Localizer {
         if (fs.exists(userDir) || fs.mkdirs(userDir)) {
 
           // Set permissions on the user-directory
-          fs.setPermission(userDir, new FsPermission((short)0700));
+          FsPermission userOnly = new FsPermission((short) 0700);
+          FileUtil.setPermission(new File(userDir.toUri().getPath()), 
+                                 userOnly);
           userDirStatus = true;
 
           // Set up the jobcache directory
-          Path jobCacheDir =
-              new Path(localDir, TaskTracker.getJobCacheSubdir(user));
-          if (fs.exists(jobCacheDir) || fs.mkdirs(jobCacheDir)) {
+          File jobCacheDir =
+              new File(localDir, TaskTracker.getJobCacheSubdir(user));
+          if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
             // Set permissions on the jobcache-directory
-            fs.setPermission(jobCacheDir, new FsPermission((short)0700));
+            FileUtil.setPermission(jobCacheDir, userOnly);
             jobCacheDirStatus = true;
           } else {
             LOG.warn("Unable to create job cache directory : "
@@ -128,11 +129,12 @@ public class Localizer {
           }
 
           // Set up the cache directory used for distributed cache files
-          Path distributedCacheDir =
-              new Path(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
-          if (fs.exists(distributedCacheDir) || fs.mkdirs(distributedCacheDir)) {
+          File distributedCacheDir =
+              new File(localDir, 
+                       TaskTracker.getPrivateDistributedCacheDir(user));
+          if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
             // Set permissions on the distcache-directory
-            fs.setPermission(distributedCacheDir, new FsPermission((short)0700));
+            FileUtil.setPermission(distributedCacheDir, userOnly);
             distributedCacheDirStatus = true;
           } else {
             LOG.warn("Unable to create distributed-cache directory : "
@@ -164,51 +166,6 @@ public class Localizer {
   }
 
   /**
-   * Prepare the job directories for a given job. To be called by the job
-   * localization code, only if the job is not already localized.
-   * 
-   * <br>
-   * Here, we set 700 permissions on the job directories created on all disks.
-   * This we do so as to avoid any misuse by other users till the time
-   * {@link TaskController#initializeJob} is run at a
-   * later time to set proper private permissions on the job directories. <br>
-   * 
-   * @param user
-   * @param jobId
-   * @throws IOException
-   */
-  public void initializeJobDirs(String user, JobID jobId)
-      throws IOException {
-    boolean initJobDirStatus = false;
-    String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
-    for (String localDir : localDirs) {
-      Path jobDir = new Path(localDir, jobDirPath);
-      if (fs.exists(jobDir)) {
-        // this will happen on a partial execution of localizeJob. Sometimes
-        // copying job.xml to the local disk succeeds but copying job.jar might
-        // throw out an exception. We should clean up and then try again.
-        fs.delete(jobDir, true);
-      }
-
-      boolean jobDirStatus = fs.mkdirs(jobDir);
-      if (!jobDirStatus) {
-        LOG.warn("Not able to create job directory " + jobDir.toString());
-      }
-
-      initJobDirStatus = initJobDirStatus || jobDirStatus;
-
-      // job-dir has to be private to the TT
-      fs.setPermission(jobDir, new FsPermission((short)0700));
-    }
-
-    if (!initJobDirStatus) {
-      throw new IOException("Not able to initialize job directories "
-          + "in any of the configured local directories for job "
-          + jobId.toString());
-    }
-  }
-
-  /**
    * Create taskDirs on all the disks. Otherwise, in some cases, like when
    * LinuxTaskController is in use, child might wish to balance load across
    * disks but cannot itself create attempt directory because of the fact that
@@ -244,18 +201,4 @@ public class Localizer {
           + attemptId);
     }
   }
-
-  /**
-   * Create job log directory and set appropriate permissions for the directory.
-   * 
-   * @param jobId
-   */
-  public void initializeJobLogDir(JobID jobId) throws IOException {
-    Path jobUserLogDir = new Path(TaskLog.getJobDir(jobId).getCanonicalPath());
-    if (!fs.mkdirs(jobUserLogDir)) {
-      throw new IOException("Could not create job user log directory: " +
-                            jobUserLogDir);
-    }
-    fs.setPermission(jobUserLogDir, new FsPermission((short)0700));
-  }
 }