You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/08 16:26:53 UTC
svn commit: r1407128 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Author: bobby
Date: Thu Nov 8 15:26:52 2012
New Revision: 1407128
URL: http://svn.apache.org/viewvc?rev=1407128&view=rev
Log:
svn merge -c 1407118 FIXES: MAPREDUCE-4772. Fetch failures can take way too long for a map to be restarted (bobby)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Nov 8 15:26:52 2012
@@ -73,6 +73,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when
configured (jlowe via bobby)
+
+ MAPREDUCE-4772. Fetch failures can take way too long for a map to be
+ restarted (bobby)
Release 0.23.4 - UNRELEASED
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Nov 8 15:26:52 2012
@@ -68,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -78,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -1343,16 +1345,22 @@ public class JobImpl implements org.apac
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
job.fetchFailuresMapping.put(mapId, fetchFailures);
- //get number of running reduces
- int runningReduceTasks = 0;
+ //get number of shuffling reduces
+ int shufflingReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
- if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) {
- runningReduceTasks++;
+ Task task = job.tasks.get(taskId);
+ if (TaskState.RUNNING.equals(task.getState())) {
+ for(TaskAttempt attempt : task.getAttempts().values()) {
+ if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
+ shufflingReduceTasks++;
+ break;
+ }
+ }
}
}
- float failureRate = runningReduceTasks == 0 ? 1.0f :
- (float) fetchFailures / runningReduceTasks;
+ float failureRate = shufflingReduceTasks == 0 ? 1.0f :
+ (float) fetchFailures / shufflingReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
boolean isMapFaulty =
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Thu Nov 8 15:26:52 2012
@@ -18,14 +18,19 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Assert;
import org.junit.Test;
@@ -254,6 +260,169 @@ public class TestFetchFailure {
events = job.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Num completion events not correct", 2, events.length);
}
+
+ @Test
+ public void testFetchFailureMultipleReduces() throws Exception {
+ MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true);
+ Configuration conf = new Configuration();
+ // map -> reduce -> fetch-failure -> map retry is incompatible with
+ // sequential, single-task-attempt approach in uber-AM, so disable:
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("Num tasks not correct",
+ 4, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask = it.next();
+ Task reduceTask = it.next();
+ Task reduceTask2 = it.next();
+ Task reduceTask3 = it.next();
+
+ //wait for Task state move to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+ TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ TaskAttemptCompletionEvent[] events =
+ job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("Num completion events not correct",
+ 1, events.length);
+ Assert.assertEquals("Event status not correct",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
+
+ // wait for reduce to start running
+ app.waitForState(reduceTask, TaskState.RUNNING);
+ app.waitForState(reduceTask2, TaskState.RUNNING);
+ app.waitForState(reduceTask3, TaskState.RUNNING);
+ TaskAttempt reduceAttempt =
+ reduceTask.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+ updateStatus(app, reduceAttempt, Phase.SHUFFLE);
+
+ TaskAttempt reduceAttempt2 =
+ reduceTask2.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING);
+ updateStatus(app, reduceAttempt2, Phase.SHUFFLE);
+
+ TaskAttempt reduceAttempt3 =
+ reduceTask3.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
+ updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
+
+ //send 3 fetch failures from reduce to trigger map re execution
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+
+ //We should not re-launch the map task yet
+ assertEquals(TaskState.SUCCEEDED, mapTask.getState());
+ updateStatus(app, reduceAttempt2, Phase.REDUCE);
+ updateStatus(app, reduceAttempt3, Phase.REDUCE);
+
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+
+ //wait for map Task state move back to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+
+ //map attempt must have become FAILED
+ Assert.assertEquals("Map TaskAttempt state not correct",
+ TaskAttemptState.FAILED, mapAttempt1.getState());
+
+ Assert.assertEquals("Num attempts in Map Task not correct",
+ 2, mapTask.getAttempts().size());
+
+ Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
+ atIt.next();
+ TaskAttempt mapAttempt2 = atIt.next();
+
+ app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
+ //send the done signal to the second map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt2.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt2.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt3.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ //previous completion event now becomes obsolete
+ Assert.assertEquals("Event status not correct",
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+
+ events = job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("Num completion events not correct",
+ 6, events.length);
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt1.getID(), events[0].getAttemptId());
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt1.getID(), events[1].getAttemptId());
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt2.getID(), events[2].getAttemptId());
+ Assert.assertEquals("Event reduce attempt id not correct",
+ reduceAttempt.getID(), events[3].getAttemptId());
+ Assert.assertEquals("Event status not correct for map attempt1",
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+ Assert.assertEquals("Event status not correct for map attempt1",
+ TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
+ Assert.assertEquals("Event status not correct for map attempt2",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
+ Assert.assertEquals("Event status not correct for reduce attempt1",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+
+ TaskAttemptCompletionEvent mapEvents[] =
+ job.getMapAttemptCompletionEvents(0, 2);
+ Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
+ Assert.assertArrayEquals("Unexpected map events",
+ Arrays.copyOfRange(events, 0, 2), mapEvents);
+ mapEvents = job.getMapAttemptCompletionEvents(2, 200);
+ Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
+ Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+ }
+
+
+ private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
+ TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
+ status.counters = new Counters();
+ status.fetchFailedMaps = new ArrayList<TaskAttemptId>();
+ status.id = attempt.getID();
+ status.mapFinishTime = 0;
+ status.outputSize = 0;
+ status.phase = phase;
+ status.progress = 0.5f;
+ status.shuffleFinishTime = 0;
+ status.sortFinishTime = 0;
+ status.stateString = "OK";
+ status.taskState = attempt.getState();
+ TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
+ status);
+ app.getContext().getEventHandler().handle(event);
+ }
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
TaskAttempt mapAttempt) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Nov 8 15:26:52 2012
@@ -262,6 +262,9 @@ public interface MRJobConfig {
public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
+
+ public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
+ public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Nov 8 15:26:52 2012
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
@@ -247,6 +248,7 @@ class Fetcher<K,V> extends Thread {
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
} catch (IOException ie) {
+ boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
@@ -255,14 +257,14 @@ class Fetcher<K,V> extends Thread {
// indirectly penalizing the host
if (!connectSucceeded) {
for(TaskAttemptID left: remaining) {
- scheduler.copyFailed(left, host, connectSucceeded);
+ scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
}
} else {
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
TaskAttemptID firstMap = maps.get(0);
- scheduler.copyFailed(firstMap, host, connectSucceeded);
+ scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
}
// Add back all the remaining maps, WITHOUT marking them as failed
@@ -286,7 +288,7 @@ class Fetcher<K,V> extends Thread {
if(failedTasks != null && failedTasks.length > 0) {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
for(TaskAttemptID left: failedTasks) {
- scheduler.copyFailed(left, host, true);
+ scheduler.copyFailed(left, host, true, false);
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Thu Nov 8 15:26:52 2012
@@ -89,6 +89,7 @@ class ShuffleScheduler<K,V> {
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
private boolean reportReadErrorImmediately = true;
+ private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
public ShuffleScheduler(JobConf job, TaskStatus status,
ExceptionReporter reporter,
@@ -115,6 +116,9 @@ class ShuffleScheduler<K,V> {
MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
this.reportReadErrorImmediately = job.getBoolean(
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
+
+ this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
+ MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
}
public synchronized void copySucceeded(TaskAttemptID mapId,
@@ -159,7 +163,7 @@ class ShuffleScheduler<K,V> {
}
public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
- boolean readError) {
+ boolean readError, boolean connectExcpt) {
host.penalize();
int failures = 1;
if (failureCounts.containsKey(mapId)) {
@@ -184,12 +188,15 @@ class ShuffleScheduler<K,V> {
}
}
- checkAndInformJobTracker(failures, mapId, readError);
+ checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
checkReducerHealth();
long delay = (long) (INITIAL_PENALTY *
Math.pow(PENALTY_GROWTH_RATE, failures));
+ if (delay > maxDelay) {
+ delay = maxDelay;
+ }
penalties.add(new Penalty(host, delay));
@@ -200,8 +207,9 @@ class ShuffleScheduler<K,V> {
// after every read error, if 'reportReadErrorImmediately' is true or
// after every 'maxFetchFailuresBeforeReporting' failures
private void checkAndInformJobTracker(
- int failures, TaskAttemptID mapId, boolean readError) {
- if ((reportReadErrorImmediately && readError)
+ int failures, TaskAttemptID mapId, boolean readError,
+ boolean connectExcpt) {
+ if (connectExcpt || (reportReadErrorImmediately && readError)
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Nov 8 15:26:52 2012
@@ -322,6 +322,14 @@
</property>
<property>
+ <name>mapreduce.reduce.shuffle.retry-delay.max.ms</name>
+ <value>60000</value>
+ <description>The maximum number of ms the reducer will delay before retrying
+ to download map data.
+ </description>
+</property>
+
+<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
<description>The default number of parallel transfers run by reduce
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1407128&r1=1407127&r2=1407128&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Thu Nov 8 15:26:52 2012
@@ -118,8 +118,8 @@ public class TestFetcher {
encHash);
verify(allErrs).increment(1);
- verify(ss).copyFailed(map1ID, host, true);
- verify(ss).copyFailed(map2ID, host, true);
+ verify(ss).copyFailed(map1ID, host, true, false);
+ verify(ss).copyFailed(map2ID, host, true, false);
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
@@ -178,8 +178,8 @@ public class TestFetcher {
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
encHash);
verify(allErrs, never()).increment(1);
- verify(ss, never()).copyFailed(map1ID, host, true);
- verify(ss, never()).copyFailed(map2ID, host, true);
+ verify(ss, never()).copyFailed(map1ID, host, true, false);
+ verify(ss, never()).copyFailed(map2ID, host, true, false);
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));