You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2011/06/04 03:00:10 UTC
svn commit: r1131299 - in /hadoop/common/branches/branch-0.20-security:
CHANGES.txt src/mapred/mapred-default.xml
src/mapred/org/apache/hadoop/mapred/ReduceTask.java
src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
Author: cdouglas
Date: Sat Jun 4 01:00:10 2011
New Revision: 1131299
URL: http://svn.apache.org/viewvc?rev=1131299&view=rev
Log:
MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
fail faulty maps more aggressively. Contributed by Thomas Graves
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1131299&r1=1131298&r2=1131299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Jun 4 01:00:10 2011
@@ -53,6 +53,9 @@ Release 0.20.205.0 - unreleased
HADOOP-7144. Expose JMX metrics via JSON servlet. (Robert Joseph Evans via
cdouglas)
+ MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
+ fail faulty maps more aggressively. (Thomas Graves via cdouglas)
+
Release 0.20.204.0 - unreleased
NEW FEATURES
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml?rev=1131299&r1=1131298&r2=1131299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml Sat Jun 4 01:00:10 2011
@@ -310,12 +310,11 @@
</property>
<property>
- <name>mapred.reduce.copy.backoff</name>
- <value>300</value>
- <description>The maximum amount of time (in seconds) a reducer spends on
- fetching one map output before declaring it as failed.
- </description>
-</property>
+ <name>mapreduce.reduce.shuffle.maxfetchfailures</name>
+ <value>10</value>
+ <description>The maximum number of times a reducer tries to
+ fetch a map output before it reports it.
+</description></property>
<property>
<name>mapreduce.reduce.shuffle.connect.timeout</name>
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1131299&r1=1131298&r2=1131299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Sat Jun 4 01:00:10 2011
@@ -696,11 +696,6 @@ class ReduceTask extends Task {
*/
private int maxInFlight;
- /**
- * the amount of time spent on fetching one map output before considering
- * it as failed and notifying the jobtracker about it.
- */
- private int maxBackoff;
/**
* busy hosts from which copies are being backed off
@@ -802,11 +797,31 @@ class ReduceTask extends Task {
private int maxMapRuntime;
/**
- * Maximum number of fetch-retries per-map.
+ * Maximum number of fetch-retries per-map before reporting it.
*/
- private volatile int maxFetchRetriesPerMap;
+ private int maxFetchFailuresBeforeReporting;
/**
+ * Maximum number of fetch failures before reducer aborts.
+ */
+ private final int abortFailureLimit;
+
+ /**
+ * Initial penalty time in ms for a fetch failure.
+ */
+ private static final long INITIAL_PENALTY = 10000;
+
+ /**
+ * Penalty growth rate for each fetch failure.
+ */
+ private static final float PENALTY_GROWTH_RATE = 1.3f;
+
+ /**
+ * Default limit for maximum number of fetch failures before reporting.
+ */
+ private final static int REPORT_FAILURE_LIMIT = 10;
+
+ /**
* Combiner runner, if a combiner is needed
*/
private CombinerRunner combinerRunner;
@@ -1906,7 +1921,6 @@ class ReduceTask extends Task {
this.copyResults = new ArrayList<CopyResult>(100);
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
this.maxInFlight = 4 * numCopiers;
- this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
Counters.Counter combineInputCounter =
reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
@@ -1918,18 +1932,12 @@ class ReduceTask extends Task {
}
this.ioSortFactor = conf.getInt("io.sort.factor", 10);
- // the exponential backoff formula
- // backoff (t) = init * base^(t-1)
- // so for max retries we get
- // backoff(1) + .... + backoff(max_fetch_retries) ~ max
- // solving which we get
- // max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
- // for the default value of max = 300 (5min) we get max_fetch_retries = 6
- // the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
-
- // optimizing for the base 2
- this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
- getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
+
+ this.abortFailureLimit = Math.max(30, numMaps / 10);
+
+ this.maxFetchFailuresBeforeReporting = conf.getInt(
+ "mapreduce.reduce.shuffle.maxfetchfailures", REPORT_FAILURE_LIMIT);
+
this.maxFailedUniqueFetches = Math.min(numMaps,
this.maxFailedUniqueFetches);
this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
@@ -2219,44 +2227,19 @@ class ReduceTask extends Task {
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId);
- // half the number of max fetch retries per map during
- // the end of shuffle
- int fetchRetriesPerMap = maxFetchRetriesPerMap;
- int pendingCopies = numMaps - numCopied;
-
- // The check noFailedFetches != maxFetchRetriesPerMap is
- // required to make sure of the notification in case of a
- // corner case :
- // when noFailedFetches reached maxFetchRetriesPerMap and
- // reducer reached the end of shuffle, then we may miss sending
- // a notification if the difference between
- // noFailedFetches and fetchRetriesPerMap is not divisible by 2
- if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
- noFailedFetches != maxFetchRetriesPerMap) {
- fetchRetriesPerMap = fetchRetriesPerMap >> 1;
+ if (noFailedFetches >= abortFailureLimit) {
+ LOG.fatal(noFailedFetches + " failures downloading "
+ + getTaskID() + ".");
+ umbilical.shuffleError(getTaskID(),
+ "Exceeded the abort failure limit;"
+ + " bailing-out.", jvmContext);
}
- // did the fetch fail too many times?
- // using a hybrid technique for notifying the jobtracker.
- // a. the first notification is sent after max-retries
- // b. subsequent notifications are sent after 2 retries.
- // c. send notification immediately if it is a read error and
- // "mapreduce.reduce.shuffle.notify.readerror" set true.
- if ((reportReadErrorImmediately && cr.getError().equals(
- CopyOutputErrorType.READ_ERROR)) ||
- ((noFailedFetches >= fetchRetriesPerMap)
- && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
- synchronized (ReduceTask.this) {
- taskStatus.addFetchFailedMap(mapTaskId);
- reporter.progress();
- LOG.info("Failed to fetch map-output from " + mapTaskId +
- " even after MAX_FETCH_RETRIES_PER_MAP retries... "
- + " or it is a read error, "
- + " reporting to the JobTracker");
- }
- }
+ checkAndInformJobTracker(noFailedFetches, mapTaskId,
+ cr.getError().equals(CopyOutputErrorType.READ_ERROR));
+
// note unique failed-fetch maps
- if (noFailedFetches == maxFetchRetriesPerMap) {
+ if (noFailedFetches == maxFetchFailuresBeforeReporting) {
fetchFailedMaps.add(mapId);
// did we have too many unique failed-fetch maps?
@@ -2302,26 +2285,12 @@ class ReduceTask extends Task {
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+ " bailing-out.", jvmContext);
}
+
}
- // back off exponentially until num_retries <= max_retries
- // back off by max_backoff/2 on subsequent failed attempts
currentTime = System.currentTimeMillis();
- int currentBackOff = noFailedFetches <= fetchRetriesPerMap
- ? BACKOFF_INIT
- * (1 << (noFailedFetches - 1))
- : (this.maxBackoff * 1000 / 2);
- // If it is read error,
- // back off for maxMapRuntime/2
- // during end of shuffle,
- // backoff for min(maxMapRuntime/2, currentBackOff)
- if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
- int backOff = maxMapRuntime >> 1;
- if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
- backOff = Math.min(backOff, currentBackOff);
- }
- currentBackOff = backOff;
- }
+ long currentBackOff = (long)(INITIAL_PENALTY *
+ Math.pow(PENALTY_GROWTH_RATE, noFailedFetches));
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
LOG.warn(reduceTask.getTaskID() + " adding host " +
@@ -2386,6 +2355,26 @@ class ReduceTask extends Task {
return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
}
+ // Notify the JobTracker
+ // after every read error, if 'reportReadErrorImmediately' is true or
+ // after every 'maxFetchFailuresBeforeReporting' failures
+ protected void checkAndInformJobTracker(
+ int failures, TaskAttemptID mapId, boolean readError) {
+ if ((reportReadErrorImmediately && readError)
+ || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+ synchronized (ReduceTask.this) {
+ taskStatus.addFetchFailedMap(mapId);
+ reporter.progress();
+ LOG.info("Failed to fetch map-output from " + mapId +
+ " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ + " or it is a read error, "
+ + " reporting to the JobTracker");
+ }
+ }
+ }
+
+
+
private long createInMemorySegments(
List<Segment<K, V>> inMemorySegments, long leaveBytes)
throws IOException {
@@ -2887,13 +2876,6 @@ class ReduceTask extends Task {
URI u = URI.create(event.getTaskTrackerHttp());
String host = u.getHost();
TaskAttemptID taskId = event.getTaskAttemptId();
- int duration = event.getTaskRunTime();
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- // adjust max-fetch-retries based on max-map-run-time
- maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
- getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
- }
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
"/mapOutput?job=" + taskId.getJobID() +
"&map=" + taskId +
Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java?rev=1131299&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java Sat Jun 4 01:00:10 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.ReduceTask;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+
+
+public class TestReduceTaskFetchFail {
+
+ public static class TestReduceTask extends ReduceTask {
+ public TestReduceTask() {
+ super();
+ }
+ public String getJobFile() { return "/foo"; }
+
+ public class TestReduceCopier extends ReduceCopier {
+ public TestReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
+ TaskReporter reporter
+ )throws ClassNotFoundException, IOException {
+ super(umbilical, conf, reporter);
+ }
+
+ public void checkAndInformJobTracker(int failures, TaskAttemptID mapId, boolean readError) {
+ super.checkAndInformJobTracker(failures, mapId, readError);
+ }
+
+ }
+
+ }
+
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testcheckAndInformJobTracker() throws Exception {
+ //mock creation
+ TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+ TaskReporter mockTaskReporter = mock(TaskReporter.class);
+
+ JobConf conf = new JobConf();
+ conf.setUser("testuser");
+ conf.setJobName("testJob");
+ conf.setSessionId("testSession");
+
+ TaskAttemptID tid = new TaskAttemptID();
+ TestReduceTask rTask = new TestReduceTask();
+ rTask.setConf(conf);
+
+ ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+ reduceCopier.checkAndInformJobTracker(1, tid, false);
+
+ verify(mockTaskReporter, never()).progress();
+
+ reduceCopier.checkAndInformJobTracker(10, tid, false);
+ verify(mockTaskReporter, times(1)).progress();
+
+ // Test the config setting
+ conf.setInt("mapreduce.reduce.shuffle.maxfetchfailures", 3);
+
+ rTask.setConf(conf);
+ reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+
+ reduceCopier.checkAndInformJobTracker(1, tid, false);
+ verify(mockTaskReporter, times(1)).progress();
+
+ reduceCopier.checkAndInformJobTracker(3, tid, false);
+ verify(mockTaskReporter, times(2)).progress();
+
+ reduceCopier.checkAndInformJobTracker(5, tid, false);
+ verify(mockTaskReporter, times(2)).progress();
+
+ reduceCopier.checkAndInformJobTracker(6, tid, false);
+ verify(mockTaskReporter, times(3)).progress();
+
+ // test readError and its config
+ reduceCopier.checkAndInformJobTracker(7, tid, true);
+ verify(mockTaskReporter, times(4)).progress();
+
+ conf.setBoolean("mapreduce.reduce.shuffle.notify.readerror", false);
+
+ rTask.setConf(conf);
+ reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+
+ reduceCopier.checkAndInformJobTracker(7, tid, true);
+ verify(mockTaskReporter, times(4)).progress();
+
+ }
+}