You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/12 20:58:21 UTC

svn commit: r726101 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Author: cdouglas
Date: Fri Dec 12 11:58:20 2008
New Revision: 726101

URL: http://svn.apache.org/viewvc?rev=726101&view=rev
Log:
HADOOP-4699. Remove checksum validation from map output servlet.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=726101&r1=726100&r2=726101&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec 12 11:58:20 2008
@@ -230,6 +230,8 @@
     HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker names.
     Also adds commands to display running/completed task attempt IDs. (ddas)
 
+    HADOOP-4699. Remove checksum validation from map output servlet. (cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=726101&r1=726100&r2=726101&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 12 11:58:20 2008
@@ -62,7 +62,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -2807,11 +2806,9 @@
       OutputStream outStream = null;
       FSDataInputStream mapOutputIn = null;
  
-      IFileInputStream checksumInputStream = null;
-      
       long totalRead = 0;
-      ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
-                                      context.getAttribute("shuffleServerMetrics");
+      ShuffleServerMetrics shuffleMetrics =
+        (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
 
@@ -2821,8 +2818,8 @@
         JobConf conf = (JobConf) context.getAttribute("conf");
         LocalDirAllocator lDirAlloc = 
           (LocalDirAllocator)context.getAttribute("localDirAllocator");
-        FileSystem fileSys = 
-          (FileSystem) context.getAttribute("local.file.system");
+        FileSystem rfs = ((LocalFileSystem)
+            context.getAttribute("local.file.system")).getRaw();
 
         // Index file
         Path indexFileName = lDirAlloc.getLocalPathToRead(
@@ -2843,18 +2840,15 @@
        IndexRecord info = 
           tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
           
-        final long startOffset = info.startOffset;
-        final long rawPartLength = info.rawLength;
-        final long partLength = info.partLength;
-
         //set the custom "Raw-Map-Output-Length" http header to 
         //the raw (decompressed) length
-        response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
+        response.setHeader(RAW_MAP_OUTPUT_LENGTH,
+            Long.toString(info.rawLength));
 
         //set the custom "Map-Output-Length" http header to 
         //the actual number of bytes being transferred
-        response.setHeader(MAP_OUTPUT_LENGTH, 
-                           Long.toString(partLength));
+        response.setHeader(MAP_OUTPUT_LENGTH,
+            Long.toString(info.partLength));
 
         //use the same buffersize as used for reading the data from disk
         response.setBufferSize(MAX_BYTES_TO_READ);
@@ -2864,33 +2858,15 @@
          * send it to the reducer.
          */
         //open the map-output file
-        FileSystem rfs = ((LocalFileSystem)fileSys).getRaw();
-
         mapOutputIn = rfs.open(mapOutputFileName);
-        // TODO: Remove this after a 'fix' for HADOOP-3647
-        // The clever trick here to reduce the impact of the extra seek for
-        // logging the first key/value lengths is to read the lengths before
-        // the second seek for the actual shuffle. The second seek is almost
-        // a no-op since it is very short (go back length of two VInts) and the 
-        // data is almost guaranteed to be in the filesystem's buffers.
-        // WARN: This won't work for compressed map-outputs!
-        int firstKeyLength = 0;
-        int firstValueLength = 0;
-        if (partLength > 0) {
-          mapOutputIn.seek(startOffset);
-          firstKeyLength = WritableUtils.readVInt(mapOutputIn);
-          firstValueLength = WritableUtils.readVInt(mapOutputIn);
-        }
-        
 
         //seek to the correct offset for the reduce
-        mapOutputIn.seek(startOffset);
-        checksumInputStream = new IFileInputStream(mapOutputIn,partLength);
-          
-        int len = checksumInputStream.readWithChecksum(buffer, 0,
-                                   partLength < MAX_BYTES_TO_READ 
-                                   ? (int)partLength : MAX_BYTES_TO_READ);
-        while (len > 0) {
+        mapOutputIn.seek(info.startOffset);
+        long rem = info.partLength;
+        int len =
+          mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
+        while (rem > 0 && len >= 0) {
+          rem -= len;
           try {
             shuffleMetrics.outputBytes(len);
             outStream.write(buffer, 0, len);
@@ -2900,16 +2876,13 @@
             throw ie;
           }
           totalRead += len;
-          if (totalRead == partLength) break;
-          len = checksumInputStream.readWithChecksum(buffer, 0, 
-                        (partLength - totalRead) < MAX_BYTES_TO_READ
-                          ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
+          len =
+            mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
-        
+
         LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
-                 " from map: " + mapId + " given " + partLength + "/" + 
-                 rawPartLength + " from " + startOffset + " with (" + 
-                 firstKeyLength + ", " + firstValueLength + ")");
+                 " from map: " + mapId + " given " + info.partLength + "/" + 
+                 info.rawLength);
       } catch (IOException ie) {
         Log log = (Log) context.getAttribute("log");
         String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
@@ -2923,8 +2896,8 @@
         shuffleMetrics.failedOutput();
         throw ie;
       } finally {
-        if (checksumInputStream != null) {
-          checksumInputStream.close();
+        if (null != mapOutputIn) {
+          mapOutputIn.close();
         }
         shuffleMetrics.serverHandlerFree();
         if (ClientTraceLog.isInfoEnabled()) {