You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/01/06 19:38:37 UTC

svn commit: r1056000 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: todd
Date: Thu Jan  6 18:38:37 2011
New Revision: 1056000

URL: http://svn.apache.org/viewvc?rev=1056000&view=rev
Log:
MAPREDUCE-2096. Secure local filesystem IO from symlink vulnerabilities. Contributed by Todd Lipcon

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan  6 18:38:37 2011
@@ -458,6 +458,8 @@ Release 0.22.0 - Unreleased
 
     MAPREDUCE-714. JobConf.findContainingJar unescapes unnecessarily on linux (todd)
 
+    MAPREDUCE-2096. Secure local filesystem IO from symlink vulnerabilities (todd)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java Thu Jan  6 18:38:37 2011
@@ -54,16 +54,18 @@ class IndexCache {
    * @param reduce
    * @param fileName The file to read the index information from if it is not
    *                 already present in the cache
+   * @param expectedIndexOwner The expected owner of the index file
    * @return The Index Information
    * @throws IOException
    */
   public IndexRecord getIndexInformation(String mapId, int reduce,
-      Path fileName) throws IOException {
+                                         Path fileName, String expectedIndexOwner)
+    throws IOException {
 
     IndexInformation info = cache.get(mapId);
 
     if (info == null) {
-      info = readIndexFileToCache(fileName, mapId);
+      info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
     } else {
       synchronized (info) {
         while (null == info.mapSpillRecord) {
@@ -87,7 +89,9 @@ class IndexCache {
   }
 
   private IndexInformation readIndexFileToCache(Path indexFileName,
-      String mapId) throws IOException {
+                                                String mapId,
+                                                String expectedIndexOwner)
+    throws IOException {
     IndexInformation info;
     IndexInformation newInd = new IndexInformation();
     if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
@@ -106,7 +110,7 @@ class IndexCache {
     LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
     SpillRecord tmp = null;
     try { 
-      tmp = new SpillRecord(indexFileName, conf);
+      tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
     } catch (Throwable e) { 
       tmp = new SpillRecord(0);
       cache.remove(mapId);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Thu Jan  6 18:38:37 2011
@@ -649,4 +649,9 @@ class LinuxTaskController extends TaskCo
   protected String getTaskControllerExecutablePath() {
     return taskControllerExe;
   }
-}
+
+  @Override
+  String getRunAsUser(JobConf conf) {
+    return conf.getUser();
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Jan  6 18:38:37 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.mapreduce.MRCon
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
@@ -1655,7 +1656,8 @@ class MapTask extends Task {
       // read in paged indices
       for (int i = indexCacheList.size(); i < numSpills; ++i) {
         Path indexFileName = mapOutputFile.getSpillIndexFile(i);
-        indexCacheList.add(new SpillRecord(indexFileName, job));
+        indexCacheList.add(new SpillRecord(indexFileName, job,
+            UserGroupInformation.getCurrentUser().getShortUserName()));
       }
 
       //make correction in the length to include the sequence file header

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java Thu Jan  6 18:38:37 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.DataInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.LongBuffer;
@@ -25,11 +27,12 @@ import java.util.zip.CheckedOutputStream
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.PureJavaCrc32;
 
 import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
@@ -47,15 +50,19 @@ class SpillRecord {
     entries = buf.asLongBuffer();
   }
 
-  public SpillRecord(Path indexFileName, JobConf job) throws IOException {
-    this(indexFileName, job, new PureJavaCrc32());
+  public SpillRecord(Path indexFileName, JobConf job, String expectedIndexOwner)
+    throws IOException {
+    this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
   }
 
-  public SpillRecord(Path indexFileName, JobConf job, Checksum crc)
+  public SpillRecord(Path indexFileName, JobConf job, Checksum crc,
+                     String expectedIndexOwner)
       throws IOException {
 
     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-    final FSDataInputStream in = rfs.open(indexFileName);
+    final DataInputStream in = 
+      new DataInputStream(SecureIOUtils.openForRead(
+          new File(indexFileName.toUri().getPath()), expectedIndexOwner, null));
     try {
       final long length = rfs.getFileStatus(indexFileName).getLen();
       final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Thu Jan  6 18:38:37 2011
@@ -434,4 +434,11 @@ public abstract class TaskController imp
    */
   abstract void enableJobForCleanup(PathDeletionContext context)
     throws IOException;
+
+  /**
+   * Returns the local unix user that a given job will run as.
+   */
+  String getRunAsUser(JobConf conf) {
+    return System.getProperty("user.name");
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Thu Jan  6 18:38:37 2011
@@ -26,6 +26,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -37,8 +38,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
@@ -100,7 +103,8 @@ public class TaskLog {
                                                 boolean isCleanup) 
   throws IOException {
     File indexFile = getIndexFile(taskid, isCleanup);
-    BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
+    BufferedReader fis = new BufferedReader(new InputStreamReader(
+      SecureIOUtils.openForRead(indexFile, obtainLogDirOwner(taskid), null)));
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
     //stdout:<start-offset in the stdout file> <length>
@@ -146,6 +150,18 @@ public class TaskLog {
     return new File(getAttemptDir(taskid, isCleanup), "log.index");
   }
 
+  /**
+   * Obtain the owner of the log dir. This is 
+   * determined by checking the job's log directory.
+   */
+  static String obtainLogDirOwner(TaskAttemptID taskid) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem raw = FileSystem.getLocal(conf).getRaw();
+    Path jobLogDir = new Path(getJobDir(taskid.getJobID()).getAbsolutePath());
+    FileStatus jobStat = raw.getFileStatus(jobLogDir);
+    return jobStat.getOwner();
+  }
+
   static String getBaseLogDir() {
     return System.getProperty("hadoop.log.dir");
   }
@@ -164,9 +180,10 @@ public class TaskLog {
     // To ensure atomicity of updates to index file, write to temporary index
     // file first and then rename.
     File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
-    
+
     BufferedOutputStream bos = 
-      new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
+      new BufferedOutputStream(
+        SecureIOUtils.createForWrite(tmpIndexFile, 0644));
     DataOutputStream dos = new DataOutputStream(bos);
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
@@ -295,7 +312,9 @@ public class TaskLog {
       start += fileDetail.start;
       end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(new File(fileDetail.location, kind.toString()));
+      String owner = obtainLogDirOwner(taskid);
+      file = SecureIOUtils.openForRead(new File(fileDetail.location, kind.toString()), 
+          owner, null);
       // skip upto start
       long pos = 0;
       while (pos < start) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Thu Jan  6 18:38:37 2011
@@ -28,6 +28,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
@@ -46,7 +48,10 @@ import org.apache.hadoop.util.StringUtil
 @InterfaceStability.Unstable
 public class TaskLogServlet extends HttpServlet {
   private static final long serialVersionUID = -6615764817774487321L;
-  
+
+  private static final Log LOG =
+    LogFactory.getLog(TaskLog.class);
+
   private boolean haveTaskLog(TaskAttemptID taskId, boolean isCleanup,
       TaskLog.LogName type) {
     File f = TaskLog.getTaskLogFile(taskId, isCleanup, type);
@@ -105,11 +110,10 @@ public class TaskLogServlet extends Http
         // do nothing
       }
       else {
-        response.sendError(HttpServletResponse.SC_GONE,
-                         "Failed to retrieve " + filter + " log for task: " + 
-                         taskId);
-        out.write(("TaskLogServlet exception:\n" + 
-                 StringUtils.stringifyException(ioe) + "\n").getBytes());
+        String msg = "Failed to retrieve " + filter + " log for task: " + 
+                     taskId;
+        LOG.warn(msg, ioe);
+        response.sendError(HttpServletResponse.SC_GONE, msg);
       }
     }
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Jan  6 18:38:37 2011
@@ -216,7 +216,7 @@ abstract class TaskRunner extends Thread
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       try {
-        tracker.fsError(t.getTaskID(), e.getMessage());
+        tracker.internalFsError(t.getTaskID(), e.getMessage());
       } catch (IOException ie) {
         LOG.fatal(t.getTaskID()+" reporting FSError", ie);
       }
@@ -226,7 +226,7 @@ abstract class TaskRunner extends Thread
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       causeThrowable.printStackTrace(new PrintStream(baos));
       try {
-        tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
+        tracker.internalReportDiagnosticInfo(t.getTaskID(), baos.toString());
       } catch (IOException e) {
         LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
       }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jan  6 18:38:37 2011
@@ -20,6 +20,7 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -66,7 +67,9 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -1155,7 +1158,13 @@ public class TaskTracker 
     String jobOwner = conf.getUser();
     aclConf.set("user.name", jobOwner);
 
-    FileOutputStream out = new FileOutputStream(aclFile);
+    FileOutputStream out;
+    try {
+      out = SecureIOUtils.createForWrite(aclFile, 0600);
+    } catch (SecureIOUtils.AlreadyExistsException aee) {
+      LOG.warn("Job ACL file already exists at " + aclFile, aee);
+      return;
+    }
     try {
       aclConf.writeXml(out);
     } finally {
@@ -3191,6 +3200,21 @@ public class TaskTracker 
     }
   }
 
+  /**
+   * Check that the current UGI is the JVM authorized to report
+   * for this particular job.
+   *
+   * @throws IOException for unauthorized access
+   */
+  private void ensureAuthorizedJVM(JobID jobId) throws IOException {
+    String currentJobId = 
+      UserGroupInformation.getCurrentUser().getUserName();
+    if (!currentJobId.equals(jobId.toString())) {
+      throw new IOException ("JVM with " + currentJobId + 
+          " is not authorized for " + jobId);
+    }
+  }
+
     
   // ///////////////////////////////////////////////////////////////
   // TaskUmbilicalProtocol
@@ -3201,6 +3225,7 @@ public class TaskTracker 
    */
   public synchronized JvmTask getTask(JvmContext context) 
   throws IOException {
+    ensureAuthorizedJVM(context.jvmId.getJobId());
     JVMId jvmId = context.jvmId;
     
     // save pid of task JVM sent by child
@@ -3239,6 +3264,7 @@ public class TaskTracker 
   public synchronized boolean statusUpdate(TaskAttemptID taskid, 
                                               TaskStatus taskStatus) 
   throws IOException {
+    ensureAuthorizedJVM(taskid.getJobID());
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
       tip.reportProgress(taskStatus);
@@ -3255,6 +3281,16 @@ public class TaskTracker 
    * diagnostic info
    */
   public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
+    ensureAuthorizedJVM(taskid.getJobID());
+    internalReportDiagnosticInfo(taskid, info);
+  }
+
+  /**
+   * Same as reportDiagnosticInfo but does not authorize caller. This is used
+   * internally within MapReduce, whereas reportDiagonsticInfo may be called
+   * via RPC.
+   */
+  synchronized void internalReportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
       tip.reportDiagnosticInfo(info);
@@ -3265,6 +3301,7 @@ public class TaskTracker 
   
   public synchronized void reportNextRecordRange(TaskAttemptID taskid, 
       SortedRanges.Range range) throws IOException {
+    ensureAuthorizedJVM(taskid.getJobID());
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
       tip.reportNextRecordRange(range);
@@ -3276,6 +3313,7 @@ public class TaskTracker 
 
   /** Child checking to see if we're alive.  Normally does nothing.*/
   public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
+    ensureAuthorizedJVM(taskid.getJobID());
     return tasks.get(taskid) != null;
   }
 
@@ -3286,6 +3324,7 @@ public class TaskTracker 
   public synchronized void commitPending(TaskAttemptID taskid,
                                          TaskStatus taskStatus) 
   throws IOException {
+    ensureAuthorizedJVM(taskid.getJobID());
     LOG.info("Task " + taskid + " is in commit-pending," +"" +
              " task state:" +taskStatus.getRunState());
     statusUpdate(taskid, taskStatus);
@@ -3304,6 +3343,7 @@ public class TaskTracker 
    */
   public synchronized void done(TaskAttemptID taskid) 
   throws IOException {
+    ensureAuthorizedJVM(taskid.getJobID());
     TaskInProgress tip = tasks.get(taskid);
     commitResponses.remove(taskid);
     if (tip != null) {
@@ -3319,6 +3359,7 @@ public class TaskTracker 
    */  
   public synchronized void shuffleError(TaskAttemptID taskId, String message) 
   throws IOException { 
+    ensureAuthorizedJVM(taskId.getJobID());
     LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
     tip.reportDiagnosticInfo("Shuffle Error: " + message);
@@ -3330,6 +3371,16 @@ public class TaskTracker 
    */  
   public synchronized void fsError(TaskAttemptID taskId, String message) 
   throws IOException {
+    ensureAuthorizedJVM(taskId.getJobID());
+    internalFsError(taskId, message);
+  }
+
+  /**
+   * Version of fsError() that does not do authorization checks, called by
+   * the TaskRunner.
+   */
+  synchronized void internalFsError(TaskAttemptID taskId, String message)
+  throws IOException {
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
     tip.reportDiagnosticInfo("FSError: " + message);
@@ -3341,6 +3392,7 @@ public class TaskTracker 
    */  
   public synchronized void fatalError(TaskAttemptID taskId, String msg) 
   throws IOException {
+    ensureAuthorizedJVM(taskId.getJobID());
     LOG.fatal("Task: " + taskId + " - exited : " + msg);
     TaskInProgress tip = runningTasks.get(taskId);
     tip.reportDiagnosticInfo("Error: " + msg);
@@ -3675,17 +3727,19 @@ public class TaskTracker 
       
       // true iff IOException was caused by attempt to access input
       boolean isInputException = false;
-      FSDataInputStream mapOutputIn = null;
+      FileInputStream mapOutputIn = null;
       byte[] buffer = new byte[MAX_BYTES_TO_READ];
       long totalRead = 0;
 
       String userName = null;
+      String runAsUserName = null;
       synchronized (tracker.runningJobs) {
         RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
         if (rjob == null) {
           throw new IOException("Unknown job " + jobId + "!!");
         }
         userName = rjob.jobConf.getUser();
+        runAsUserName = tracker.getTaskController().getRunAsUser(rjob.jobConf);
       }
       // Index file
       Path indexFileName =
@@ -3704,17 +3758,19 @@ public class TaskTracker 
        * for the given reducer is available.
        */
       IndexRecord info = 
-        tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName);
-      
+        tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName,
+            runAsUserName);
+
       try {
         /**
          * Read the data from the single map-output file and
          * send it to the reducer.
          */
         //open the map-output file
-        mapOutputIn = localfs.open(mapOutputFileName);
+        mapOutputIn = SecureIOUtils.openForRead(
+            new File(mapOutputFileName.toUri().getPath()), runAsUserName, null);
         //seek to the correct offset for the reduce
-        mapOutputIn.seek(info.startOffset);
+        IOUtils.skipFully(mapOutputIn, info.startOffset);
         
         // write header for each map output
         ShuffleHeader header = new ShuffleHeader(mapId, info.partLength,

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java Thu Jan  6 18:38:37 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
 import junit.framework.TestCase;
@@ -56,7 +57,8 @@ public class TestIndexCache extends Test
       Path f = new Path(p, Integer.toString(totalsize, 36));
       writeFile(fs, f, totalsize, partsPerMap);
       IndexRecord rec = cache.getIndexInformation(
-          Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f);
+        Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f,
+        UserGroupInformation.getCurrentUser().getShortUserName());
       checkRecord(rec, totalsize);
     }
 
@@ -67,7 +69,8 @@ public class TestIndexCache extends Test
     for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) {
       Path f = new Path(p, Integer.toString(i, 36));
       IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
-          r.nextInt(partsPerMap), f);
+        r.nextInt(partsPerMap), f,
+        UserGroupInformation.getCurrentUser().getShortUserName());
       checkRecord(rec, i);
     }
 
@@ -75,14 +78,16 @@ public class TestIndexCache extends Test
     Path f = new Path(p, Integer.toString(totalsize, 36));
     writeFile(fs, f, totalsize, partsPerMap);
     cache.getIndexInformation(Integer.toString(totalsize, 36),
-        r.nextInt(partsPerMap), f);
+        r.nextInt(partsPerMap), f,
+        UserGroupInformation.getCurrentUser().getShortUserName());
     fs.delete(f, false);
 
     // oldest fails to read, or error
     boolean fnf = false;
     try {
       cache.getIndexInformation(Integer.toString(bytesPerFile, 36),
-          r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)));
+        r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)),
+        UserGroupInformation.getCurrentUser().getShortUserName());
     } catch (IOException e) {
       if (e.getCause() == null ||
           !(e.getCause()  instanceof FileNotFoundException)) {
@@ -97,11 +102,14 @@ public class TestIndexCache extends Test
     // should find all the other entries
     for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) {
       IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
-          r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)));
+          r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)),
+          UserGroupInformation.getCurrentUser().getShortUserName());
       checkRecord(rec, i);
     }
     IndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36),
-        r.nextInt(partsPerMap), f);
+      r.nextInt(partsPerMap), f,
+      UserGroupInformation.getCurrentUser().getShortUserName());
+
     checkRecord(rec, totalsize);
   }
 
@@ -131,7 +139,8 @@ public class TestIndexCache extends Test
     out.writeLong(iout.getChecksum().getValue());
     dout.close();
     try {
-      cache.getIndexInformation("badindex", 7, f);
+      cache.getIndexInformation("badindex", 7, f,
+        UserGroupInformation.getCurrentUser().getShortUserName());
       fail("Did not detect bad checksum");
     } catch (IOException e) {
       if (!(e.getCause() instanceof ChecksumException)) {