You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/12 20:58:21 UTC
svn commit: r726101 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Author: cdouglas
Date: Fri Dec 12 11:58:20 2008
New Revision: 726101
URL: http://svn.apache.org/viewvc?rev=726101&view=rev
Log:
HADOOP-4699. Remove checksum validation from map output servlet.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=726101&r1=726100&r2=726101&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec 12 11:58:20 2008
@@ -230,6 +230,8 @@
HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker names.
Also adds commands to display running/completed task attempt IDs. (ddas)
+ HADOOP-4699. Remove checksum validation from map output servlet. (cdouglas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=726101&r1=726100&r2=726101&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 12 11:58:20 2008
@@ -62,7 +62,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@@ -2807,11 +2806,9 @@
OutputStream outStream = null;
FSDataInputStream mapOutputIn = null;
- IFileInputStream checksumInputStream = null;
-
long totalRead = 0;
- ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
- context.getAttribute("shuffleServerMetrics");
+ ShuffleServerMetrics shuffleMetrics =
+ (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
TaskTracker tracker =
(TaskTracker) context.getAttribute("task.tracker");
@@ -2821,8 +2818,8 @@
JobConf conf = (JobConf) context.getAttribute("conf");
LocalDirAllocator lDirAlloc =
(LocalDirAllocator)context.getAttribute("localDirAllocator");
- FileSystem fileSys =
- (FileSystem) context.getAttribute("local.file.system");
+ FileSystem rfs = ((LocalFileSystem)
+ context.getAttribute("local.file.system")).getRaw();
// Index file
Path indexFileName = lDirAlloc.getLocalPathToRead(
@@ -2843,18 +2840,15 @@
IndexRecord info =
tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
- final long startOffset = info.startOffset;
- final long rawPartLength = info.rawLength;
- final long partLength = info.partLength;
-
//set the custom "Raw-Map-Output-Length" http header to
//the raw (decompressed) length
- response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
+ response.setHeader(RAW_MAP_OUTPUT_LENGTH,
+ Long.toString(info.rawLength));
//set the custom "Map-Output-Length" http header to
//the actual number of bytes being transferred
- response.setHeader(MAP_OUTPUT_LENGTH,
- Long.toString(partLength));
+ response.setHeader(MAP_OUTPUT_LENGTH,
+ Long.toString(info.partLength));
//use the same buffersize as used for reading the data from disk
response.setBufferSize(MAX_BYTES_TO_READ);
@@ -2864,33 +2858,15 @@
* send it to the reducer.
*/
//open the map-output file
- FileSystem rfs = ((LocalFileSystem)fileSys).getRaw();
-
mapOutputIn = rfs.open(mapOutputFileName);
- // TODO: Remove this after a 'fix' for HADOOP-3647
- // The clever trick here to reduce the impact of the extra seek for
- // logging the first key/value lengths is to read the lengths before
- // the second seek for the actual shuffle. The second seek is almost
- // a no-op since it is very short (go back length of two VInts) and the
- // data is almost guaranteed to be in the filesystem's buffers.
- // WARN: This won't work for compressed map-outputs!
- int firstKeyLength = 0;
- int firstValueLength = 0;
- if (partLength > 0) {
- mapOutputIn.seek(startOffset);
- firstKeyLength = WritableUtils.readVInt(mapOutputIn);
- firstValueLength = WritableUtils.readVInt(mapOutputIn);
- }
-
//seek to the correct offset for the reduce
- mapOutputIn.seek(startOffset);
- checksumInputStream = new IFileInputStream(mapOutputIn,partLength);
-
- int len = checksumInputStream.readWithChecksum(buffer, 0,
- partLength < MAX_BYTES_TO_READ
- ? (int)partLength : MAX_BYTES_TO_READ);
- while (len > 0) {
+ mapOutputIn.seek(info.startOffset);
+ long rem = info.partLength;
+ int len =
+ mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
+ while (rem > 0 && len >= 0) {
+ rem -= len;
try {
shuffleMetrics.outputBytes(len);
outStream.write(buffer, 0, len);
@@ -2900,16 +2876,13 @@
throw ie;
}
totalRead += len;
- if (totalRead == partLength) break;
- len = checksumInputStream.readWithChecksum(buffer, 0,
- (partLength - totalRead) < MAX_BYTES_TO_READ
- ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
+ len =
+ mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
}
-
+
LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce +
- " from map: " + mapId + " given " + partLength + "/" +
- rawPartLength + " from " + startOffset + " with (" +
- firstKeyLength + ", " + firstValueLength + ")");
+ " from map: " + mapId + " given " + info.partLength + "/" +
+ info.rawLength);
} catch (IOException ie) {
Log log = (Log) context.getAttribute("log");
String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +
@@ -2923,8 +2896,8 @@
shuffleMetrics.failedOutput();
throw ie;
} finally {
- if (checksumInputStream != null) {
- checksumInputStream.close();
+ if (null != mapOutputIn) {
+ mapOutputIn.close();
}
shuffleMetrics.serverHandlerFree();
if (ClientTraceLog.isInfoEnabled()) {