You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2009/12/11 04:55:54 UTC
svn commit: r889496 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/JobContext.java
src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Author: acmurthy
Date: Fri Dec 11 03:55:54 2009
New Revision: 889496
URL: http://svn.apache.org/viewvc?rev=889496&view=rev
Log:
MAPREDUCE-1171. Allow shuffle retries and read-error reporting to be configurable. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=889496&r1=889495&r2=889496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 11 03:55:54 2009
@@ -985,3 +985,7 @@
MAPREDUCE-1230. Fix handling of null records in VerticaInputFormat. (Omer
Trajman via cdouglas)
+
+ MAPREDUCE-1171. Allow shuffle retries and read-error reporting to be
+ configurable. (Amareshwari Sriramadasu via acmurthy)
+
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=889496&r1=889495&r2=889496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Dec 11 03:55:54 2009
@@ -212,6 +212,10 @@
"mapreduce.reduce.shuffle.connect.timeout";
public static final String SHUFFLE_READ_TIMEOUT =
"mapreduce.reduce.shuffle.read.timeout";
+ public static final String SHUFFLE_FETCH_FAILURES =
+ "mapreduce.reduce.shuffle.maxfetchfailures";
+ public static final String SHUFFLE_NOTIFY_READERROR =
+ "mapreduce.reduce.shuffle.notify.readerror";
public static final String REDUCE_SKIP_INCR_PROC_COUNT =
"mapreduce.reduce.skip.proc-count.auto-incr";
public static final String REDUCE_SKIP_MAXGROUPS =
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=889496&r1=889495&r2=889496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Fri Dec 11 03:55:54 2009
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
@@ -82,10 +83,12 @@
private int maxMapRuntime = 0;
private int maxFailedUniqueFetches = 5;
+ private int maxFetchFailuresBeforeReporting;
private long totalBytesShuffledTillNow = 0;
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+ private boolean reportReadErrorImmediately = true;
public ShuffleScheduler(JobConf job, TaskStatus status,
ExceptionReporter reporter,
@@ -108,6 +111,10 @@
referee.start();
this.maxFailedUniqueFetches = Math.min(totalMaps,
this.maxFailedUniqueFetches);
+ this.maxFetchFailuresBeforeReporting = job.getInt(
+ JobContext.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
+ this.reportReadErrorImmediately = job.getBoolean(
+ JobContext.SHUFFLE_NOTIFY_READERROR, true);
}
public synchronized void copySucceeded(TaskAttemptID mapId,
@@ -175,7 +182,6 @@
}
}
- // Notify the JobTracker after every 'reportFailureLimit' failures
checkAndInformJobTracker(failures, mapId, readError);
checkReducerHealth();
@@ -188,9 +194,14 @@
failedShuffleCounter.increment(1);
}
+ // Notify the JobTracker
+ // after every read error, if 'reportReadErrorImmediately' is true or
+ // after every 'maxFetchFailuresBeforeReporting' failures
private void checkAndInformJobTracker(
int failures, TaskAttemptID mapId, boolean readError) {
- if (readError || ((failures % REPORT_FAILURE_LIMIT) == 0)) {
+ if ((reportReadErrorImmediately && readError)
+ || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+ LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
}
}