You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/01/06 19:38:37 UTC
svn commit: r1056000 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Author: todd
Date: Thu Jan 6 18:38:37 2011
New Revision: 1056000
URL: http://svn.apache.org/viewvc?rev=1056000&view=rev
Log:
MAPREDUCE-2096. Secure local filesystem IO from symlink vulnerabilities. Contributed by Todd Lipcon
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan 6 18:38:37 2011
@@ -458,6 +458,8 @@ Release 0.22.0 - Unreleased
MAPREDUCE-714. JobConf.findContainingJar unescapes unnecessarily on linux (todd)
+ MAPREDUCE-2096. Secure local filesystem IO from symlink vulnerabilities (todd)
+
Release 0.21.1 - Unreleased
NEW FEATURES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IndexCache.java Thu Jan 6 18:38:37 2011
@@ -54,16 +54,18 @@ class IndexCache {
* @param reduce
* @param fileName The file to read the index information from if it is not
* already present in the cache
+ * @param expectedIndexOwner The expected owner of the index file
* @return The Index Information
* @throws IOException
*/
public IndexRecord getIndexInformation(String mapId, int reduce,
- Path fileName) throws IOException {
+ Path fileName, String expectedIndexOwner)
+ throws IOException {
IndexInformation info = cache.get(mapId);
if (info == null) {
- info = readIndexFileToCache(fileName, mapId);
+ info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
} else {
synchronized (info) {
while (null == info.mapSpillRecord) {
@@ -87,7 +89,9 @@ class IndexCache {
}
private IndexInformation readIndexFileToCache(Path indexFileName,
- String mapId) throws IOException {
+ String mapId,
+ String expectedIndexOwner)
+ throws IOException {
IndexInformation info;
IndexInformation newInd = new IndexInformation();
if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
@@ -106,7 +110,7 @@ class IndexCache {
LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
SpillRecord tmp = null;
try {
- tmp = new SpillRecord(indexFileName, conf);
+ tmp = new SpillRecord(indexFileName, conf, expectedIndexOwner);
} catch (Throwable e) {
tmp = new SpillRecord(0);
cache.remove(mapId);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Thu Jan 6 18:38:37 2011
@@ -649,4 +649,9 @@ class LinuxTaskController extends TaskCo
protected String getTaskControllerExecutablePath() {
return taskControllerExe;
}
-}
+
+ @Override
+ String getRunAsUser(JobConf conf) {
+ return conf.getUser();
+ }
+}
\ No newline at end of file
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Jan 6 18:38:37 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.mapreduce.MRCon
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
@@ -1655,7 +1656,8 @@ class MapTask extends Task {
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
- indexCacheList.add(new SpillRecord(indexFileName, job));
+ indexCacheList.add(new SpillRecord(indexFileName, job,
+ UserGroupInformation.getCurrentUser().getShortUserName()));
}
//make correction in the length to include the sequence file header
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SpillRecord.java Thu Jan 6 18:38:37 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred;
+import java.io.DataInputStream;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
@@ -25,11 +27,12 @@ import java.util.zip.CheckedOutputStream
import java.util.zip.Checksum;
import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
@@ -47,15 +50,19 @@ class SpillRecord {
entries = buf.asLongBuffer();
}
- public SpillRecord(Path indexFileName, JobConf job) throws IOException {
- this(indexFileName, job, new PureJavaCrc32());
+ public SpillRecord(Path indexFileName, JobConf job, String expectedIndexOwner)
+ throws IOException {
+ this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
}
- public SpillRecord(Path indexFileName, JobConf job, Checksum crc)
+ public SpillRecord(Path indexFileName, JobConf job, Checksum crc,
+ String expectedIndexOwner)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
- final FSDataInputStream in = rfs.open(indexFileName);
+ final DataInputStream in =
+ new DataInputStream(SecureIOUtils.openForRead(
+ new File(indexFileName.toUri().getPath()), expectedIndexOwner, null));
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Thu Jan 6 18:38:37 2011
@@ -434,4 +434,11 @@ public abstract class TaskController imp
*/
abstract void enableJobForCleanup(PathDeletionContext context)
throws IOException;
+
+ /**
+ * Returns the local unix user that a given job will run as.
+ */
+ String getRunAsUser(JobConf conf) {
+ return System.getProperty("user.name");
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Thu Jan 6 18:38:37 2011
@@ -26,6 +26,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -37,8 +38,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
@@ -100,7 +103,8 @@ public class TaskLog {
boolean isCleanup)
throws IOException {
File indexFile = getIndexFile(taskid, isCleanup);
- BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
+ BufferedReader fis = new BufferedReader(new InputStreamReader(
+ SecureIOUtils.openForRead(indexFile, obtainLogDirOwner(taskid), null)));
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
//stdout:<start-offset in the stdout file> <length>
@@ -146,6 +150,18 @@ public class TaskLog {
return new File(getAttemptDir(taskid, isCleanup), "log.index");
}
+ /**
+ * Obtain the owner of the log dir. This is
+ * determined by checking the job's log directory.
+ */
+ static String obtainLogDirOwner(TaskAttemptID taskid) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem raw = FileSystem.getLocal(conf).getRaw();
+ Path jobLogDir = new Path(getJobDir(taskid.getJobID()).getAbsolutePath());
+ FileStatus jobStat = raw.getFileStatus(jobLogDir);
+ return jobStat.getOwner();
+ }
+
static String getBaseLogDir() {
return System.getProperty("hadoop.log.dir");
}
@@ -164,9 +180,10 @@ public class TaskLog {
// To ensure atomicity of updates to index file, write to temporary index
// file first and then rename.
File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
-
+
BufferedOutputStream bos =
- new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
+ new BufferedOutputStream(
+ SecureIOUtils.createForWrite(tmpIndexFile, 0644));
DataOutputStream dos = new DataOutputStream(bos);
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
@@ -295,7 +312,9 @@ public class TaskLog {
start += fileDetail.start;
end += fileDetail.start;
bytesRemaining = end - start;
- file = new FileInputStream(new File(fileDetail.location, kind.toString()));
+ String owner = obtainLogDirOwner(taskid);
+ file = SecureIOUtils.openForRead(new File(fileDetail.location, kind.toString()),
+ owner, null);
// skip upto start
long pos = 0;
while (pos < start) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Thu Jan 6 18:38:37 2011
@@ -28,6 +28,8 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
@@ -46,7 +48,10 @@ import org.apache.hadoop.util.StringUtil
@InterfaceStability.Unstable
public class TaskLogServlet extends HttpServlet {
private static final long serialVersionUID = -6615764817774487321L;
-
+
+ private static final Log LOG =
+ LogFactory.getLog(TaskLog.class);
+
private boolean haveTaskLog(TaskAttemptID taskId, boolean isCleanup,
TaskLog.LogName type) {
File f = TaskLog.getTaskLogFile(taskId, isCleanup, type);
@@ -105,11 +110,10 @@ public class TaskLogServlet extends Http
// do nothing
}
else {
- response.sendError(HttpServletResponse.SC_GONE,
- "Failed to retrieve " + filter + " log for task: " +
- taskId);
- out.write(("TaskLogServlet exception:\n" +
- StringUtils.stringifyException(ioe) + "\n").getBytes());
+ String msg = "Failed to retrieve " + filter + " log for task: " +
+ taskId;
+ LOG.warn(msg, ioe);
+ response.sendError(HttpServletResponse.SC_GONE, msg);
}
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Jan 6 18:38:37 2011
@@ -216,7 +216,7 @@ abstract class TaskRunner extends Thread
} catch (FSError e) {
LOG.fatal("FSError", e);
try {
- tracker.fsError(t.getTaskID(), e.getMessage());
+ tracker.internalFsError(t.getTaskID(), e.getMessage());
} catch (IOException ie) {
LOG.fatal(t.getTaskID()+" reporting FSError", ie);
}
@@ -226,7 +226,7 @@ abstract class TaskRunner extends Thread
ByteArrayOutputStream baos = new ByteArrayOutputStream();
causeThrowable.printStackTrace(new PrintStream(baos));
try {
- tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
+ tracker.internalReportDiagnosticInfo(t.getTaskID(), baos.toString());
} catch (IOException e) {
LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jan 6 18:38:37 2011
@@ -20,6 +20,7 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -66,7 +67,9 @@ import org.apache.hadoop.fs.LocalDirAllo
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@@ -1155,7 +1158,13 @@ public class TaskTracker
String jobOwner = conf.getUser();
aclConf.set("user.name", jobOwner);
- FileOutputStream out = new FileOutputStream(aclFile);
+ FileOutputStream out;
+ try {
+ out = SecureIOUtils.createForWrite(aclFile, 0600);
+ } catch (SecureIOUtils.AlreadyExistsException aee) {
+ LOG.warn("Job ACL file already exists at " + aclFile, aee);
+ return;
+ }
try {
aclConf.writeXml(out);
} finally {
@@ -3191,6 +3200,21 @@ public class TaskTracker
}
}
+ /**
+ * Check that the current UGI is the JVM authorized to report
+ * for this particular job.
+ *
+ * @throws IOException for unauthorized access
+ */
+ private void ensureAuthorizedJVM(JobID jobId) throws IOException {
+ String currentJobId =
+ UserGroupInformation.getCurrentUser().getUserName();
+ if (!currentJobId.equals(jobId.toString())) {
+ throw new IOException ("JVM with " + currentJobId +
+ " is not authorized for " + jobId);
+ }
+ }
+
// ///////////////////////////////////////////////////////////////
// TaskUmbilicalProtocol
@@ -3201,6 +3225,7 @@ public class TaskTracker
*/
public synchronized JvmTask getTask(JvmContext context)
throws IOException {
+ ensureAuthorizedJVM(context.jvmId.getJobId());
JVMId jvmId = context.jvmId;
// save pid of task JVM sent by child
@@ -3239,6 +3264,7 @@ public class TaskTracker
public synchronized boolean statusUpdate(TaskAttemptID taskid,
TaskStatus taskStatus)
throws IOException {
+ ensureAuthorizedJVM(taskid.getJobID());
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
tip.reportProgress(taskStatus);
@@ -3255,6 +3281,16 @@ public class TaskTracker
* diagnostic info
*/
public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
+ ensureAuthorizedJVM(taskid.getJobID());
+ internalReportDiagnosticInfo(taskid, info);
+ }
+
+ /**
+ * Same as reportDiagnosticInfo but does not authorize caller. This is used
+ * internally within MapReduce, whereas reportDiagonsticInfo may be called
+ * via RPC.
+ */
+ synchronized void internalReportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
tip.reportDiagnosticInfo(info);
@@ -3265,6 +3301,7 @@ public class TaskTracker
public synchronized void reportNextRecordRange(TaskAttemptID taskid,
SortedRanges.Range range) throws IOException {
+ ensureAuthorizedJVM(taskid.getJobID());
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
tip.reportNextRecordRange(range);
@@ -3276,6 +3313,7 @@ public class TaskTracker
/** Child checking to see if we're alive. Normally does nothing.*/
public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
+ ensureAuthorizedJVM(taskid.getJobID());
return tasks.get(taskid) != null;
}
@@ -3286,6 +3324,7 @@ public class TaskTracker
public synchronized void commitPending(TaskAttemptID taskid,
TaskStatus taskStatus)
throws IOException {
+ ensureAuthorizedJVM(taskid.getJobID());
LOG.info("Task " + taskid + " is in commit-pending," +"" +
" task state:" +taskStatus.getRunState());
statusUpdate(taskid, taskStatus);
@@ -3304,6 +3343,7 @@ public class TaskTracker
*/
public synchronized void done(TaskAttemptID taskid)
throws IOException {
+ ensureAuthorizedJVM(taskid.getJobID());
TaskInProgress tip = tasks.get(taskid);
commitResponses.remove(taskid);
if (tip != null) {
@@ -3319,6 +3359,7 @@ public class TaskTracker
*/
public synchronized void shuffleError(TaskAttemptID taskId, String message)
throws IOException {
+ ensureAuthorizedJVM(taskId.getJobID());
LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
TaskInProgress tip = runningTasks.get(taskId);
tip.reportDiagnosticInfo("Shuffle Error: " + message);
@@ -3330,6 +3371,16 @@ public class TaskTracker
*/
public synchronized void fsError(TaskAttemptID taskId, String message)
throws IOException {
+ ensureAuthorizedJVM(taskId.getJobID());
+ internalFsError(taskId, message);
+ }
+
+ /**
+ * Version of fsError() that does not do authorization checks, called by
+ * the TaskRunner.
+ */
+ synchronized void internalFsError(TaskAttemptID taskId, String message)
+ throws IOException {
LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
TaskInProgress tip = runningTasks.get(taskId);
tip.reportDiagnosticInfo("FSError: " + message);
@@ -3341,6 +3392,7 @@ public class TaskTracker
*/
public synchronized void fatalError(TaskAttemptID taskId, String msg)
throws IOException {
+ ensureAuthorizedJVM(taskId.getJobID());
LOG.fatal("Task: " + taskId + " - exited : " + msg);
TaskInProgress tip = runningTasks.get(taskId);
tip.reportDiagnosticInfo("Error: " + msg);
@@ -3675,17 +3727,19 @@ public class TaskTracker
// true iff IOException was caused by attempt to access input
boolean isInputException = false;
- FSDataInputStream mapOutputIn = null;
+ FileInputStream mapOutputIn = null;
byte[] buffer = new byte[MAX_BYTES_TO_READ];
long totalRead = 0;
String userName = null;
+ String runAsUserName = null;
synchronized (tracker.runningJobs) {
RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
if (rjob == null) {
throw new IOException("Unknown job " + jobId + "!!");
}
userName = rjob.jobConf.getUser();
+ runAsUserName = tracker.getTaskController().getRunAsUser(rjob.jobConf);
}
// Index file
Path indexFileName =
@@ -3704,17 +3758,19 @@ public class TaskTracker
* for the given reducer is available.
*/
IndexRecord info =
- tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName);
-
+ tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName,
+ runAsUserName);
+
try {
/**
* Read the data from the single map-output file and
* send it to the reducer.
*/
//open the map-output file
- mapOutputIn = localfs.open(mapOutputFileName);
+ mapOutputIn = SecureIOUtils.openForRead(
+ new File(mapOutputFileName.toUri().getPath()), runAsUserName, null);
//seek to the correct offset for the reduce
- mapOutputIn.seek(info.startOffset);
+ IOUtils.skipFully(mapOutputIn, info.startOffset);
// write header for each map output
ShuffleHeader header = new ShuffleHeader(mapId, info.partLength,
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java?rev=1056000&r1=1055999&r2=1056000&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java Thu Jan 6 18:38:37 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import junit.framework.TestCase;
@@ -56,7 +57,8 @@ public class TestIndexCache extends Test
Path f = new Path(p, Integer.toString(totalsize, 36));
writeFile(fs, f, totalsize, partsPerMap);
IndexRecord rec = cache.getIndexInformation(
- Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f);
+ Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
checkRecord(rec, totalsize);
}
@@ -67,7 +69,8 @@ public class TestIndexCache extends Test
for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) {
Path f = new Path(p, Integer.toString(i, 36));
IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
- r.nextInt(partsPerMap), f);
+ r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
checkRecord(rec, i);
}
@@ -75,14 +78,16 @@ public class TestIndexCache extends Test
Path f = new Path(p, Integer.toString(totalsize, 36));
writeFile(fs, f, totalsize, partsPerMap);
cache.getIndexInformation(Integer.toString(totalsize, 36),
- r.nextInt(partsPerMap), f);
+ r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
fs.delete(f, false);
// oldest fails to read, or error
boolean fnf = false;
try {
cache.getIndexInformation(Integer.toString(bytesPerFile, 36),
- r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)));
+ r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)),
+ UserGroupInformation.getCurrentUser().getShortUserName());
} catch (IOException e) {
if (e.getCause() == null ||
!(e.getCause() instanceof FileNotFoundException)) {
@@ -97,11 +102,14 @@ public class TestIndexCache extends Test
// should find all the other entries
for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) {
IndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
- r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)));
+ r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)),
+ UserGroupInformation.getCurrentUser().getShortUserName());
checkRecord(rec, i);
}
IndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36),
- r.nextInt(partsPerMap), f);
+ r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+
checkRecord(rec, totalsize);
}
@@ -131,7 +139,8 @@ public class TestIndexCache extends Test
out.writeLong(iout.getChecksum().getValue());
dout.close();
try {
- cache.getIndexInformation("badindex", 7, f);
+ cache.getIndexInformation("badindex", 7, f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
fail("Did not detect bad checksum");
} catch (IOException e) {
if (!(e.getCause() instanceof ChecksumException)) {