You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2009/12/11 04:55:54 UTC

svn commit: r889496 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/JobContext.java src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java

Author: acmurthy
Date: Fri Dec 11 03:55:54 2009
New Revision: 889496

URL: http://svn.apache.org/viewvc?rev=889496&view=rev
Log:
MAPREDUCE-1171. Allow shuffle retries and read-error reporting to be configurable. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=889496&r1=889495&r2=889496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 11 03:55:54 2009
@@ -985,3 +985,7 @@
 
     MAPREDUCE-1230. Fix handling of null records in VerticaInputFormat. (Omer
     Trajman via cdouglas)
+
+    MAPREDUCE-1171. Allow shuffle retries and read-error reporting to be
+    configurable. (Amareshwari Sriramadasu via acmurthy)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=889496&r1=889495&r2=889496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Dec 11 03:55:54 2009
@@ -212,6 +212,10 @@
     "mapreduce.reduce.shuffle.connect.timeout";
   public static final String SHUFFLE_READ_TIMEOUT = 
     "mapreduce.reduce.shuffle.read.timeout";
+  public static final String SHUFFLE_FETCH_FAILURES = 
+    "mapreduce.reduce.shuffle.maxfetchfailures";
+  public static final String SHUFFLE_NOTIFY_READERROR = 
+    "mapreduce.reduce.shuffle.notify.readerror";
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = 
     "mapreduce.reduce.skip.proc-count.auto-incr";
   public static final String REDUCE_SKIP_MAXGROUPS = 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=889496&r1=889495&r2=889496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Fri Dec 11 03:55:54 2009
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
@@ -82,10 +83,12 @@
   
   private int maxMapRuntime = 0;
   private int maxFailedUniqueFetches = 5;
+  private int maxFetchFailuresBeforeReporting;
   
   private long totalBytesShuffledTillNow = 0;
   private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
 
+  private boolean reportReadErrorImmediately = true;
   
   public ShuffleScheduler(JobConf job, TaskStatus status,
                           ExceptionReporter reporter,
@@ -108,6 +111,10 @@
     referee.start();
     this.maxFailedUniqueFetches = Math.min(totalMaps,
         this.maxFailedUniqueFetches);
+    this.maxFetchFailuresBeforeReporting = job.getInt(
+        JobContext.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
+    this.reportReadErrorImmediately = job.getBoolean(
+        JobContext.SHUFFLE_NOTIFY_READERROR, true);
   }
 
   public synchronized void copySucceeded(TaskAttemptID mapId, 
@@ -175,7 +182,6 @@
       }
     }
     
-    // Notify the JobTracker after every 'reportFailureLimit' failures
     checkAndInformJobTracker(failures, mapId, readError);
 
     checkReducerHealth();
@@ -188,9 +194,14 @@
     failedShuffleCounter.increment(1);
   }
   
+  // Notify the JobTracker  
+  // after every read error, if 'reportReadErrorImmediately' is true or
+  // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
       int failures, TaskAttemptID mapId, boolean readError) {
-    if (readError || ((failures % REPORT_FAILURE_LIMIT) == 0)) {
+    if ((reportReadErrorImmediately && readError)
+        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+      LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
       status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
     }
   }