You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/11 21:45:34 UTC

svn commit: r462918 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskTracker.java

Author: cutting
Date: Wed Oct 11 12:45:33 2006
New Revision: 462918

URL: http://svn.apache.org/viewvc?view=rev&rev=462918
Log:
HADOOP-597.  Fix TaskTracker to not discard map outputs for errors in transmission to reduce node.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=462918&r1=462917&r2=462918
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 11 12:45:33 2006
@@ -16,6 +16,9 @@
  4. HADOOP-598.  Fix tasks to retry when reporting completion, so that
     a single RPC timeout won't fail a task.  (omalley via cutting)
 
+ 5. HADOOP-597.  Fix TaskTracker to not discard map outputs for errors
+    in transmitting them to reduce nodes.  (omalley via cutting)
+
 
 Release 0.7.0 - 2006-10-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=462918&r1=462917&r2=462918
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Oct 11 12:45:33 2006
@@ -1365,30 +1365,39 @@
         Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
         response.setContentLength((int) fileSys.getLength(filename));
         InputStream inStream = null;
+        // true iff IOException was caused by attempt to access input
+        boolean isInputException = true;
         try {
           inStream = fileSys.open(filename);
           try {
             int len = inStream.read(buffer);
             while (len > 0) {
-              outStream.write(buffer, 0, len);
+              try {
+                outStream.write(buffer, 0, len);
+              } catch (IOException ie) {
+                isInputException = false;
+                throw ie;
+              }
               len = inStream.read(buffer);
             }
           } finally {
             inStream.close();
-            outStream.close();
           }
         } catch (IOException ie) {
           TaskTracker tracker = 
             (TaskTracker) context.getAttribute("task.tracker");
           Log log = (Log) context.getAttribute("log");
-          String errorMsg = "getMapOutput(" + mapId + "," + reduceId + 
-          ") failed :\n"+
-          StringUtils.stringifyException(ie);
+          String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
+                             ") failed :\n"+
+                             StringUtils.stringifyException(ie));
           log.warn(errorMsg);
-          tracker.mapOutputLost(mapId, errorMsg);
+          if (isInputException) {
+            tracker.mapOutputLost(mapId, errorMsg);
+          }
           response.sendError(HttpServletResponse.SC_GONE, errorMsg);
           throw ie;
         } 
+        outStream.close();
       }
     }
 }