You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2011/06/04 03:00:10 UTC

svn commit: r1131299 - in /hadoop/common/branches/branch-0.20-security: CHANGES.txt src/mapred/mapred-default.xml src/mapred/org/apache/hadoop/mapred/ReduceTask.java src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java

Author: cdouglas
Date: Sat Jun  4 01:00:10 2011
New Revision: 1131299

URL: http://svn.apache.org/viewvc?rev=1131299&view=rev
Log:
MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
fail faulty maps more aggressively. Contributed by Thomas Graves

Added:
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1131299&r1=1131298&r2=1131299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Jun  4 01:00:10 2011
@@ -53,6 +53,9 @@ Release 0.20.205.0 - unreleased
     HADOOP-7144. Expose JMX metrics via JSON servlet. (Robert Joseph Evans via
     cdouglas)
 
+    MAPREDUCE-2524. Port reduce failure reporting semantics from trunk, to
+    fail faulty maps more aggressively. (Thomas Graves via cdouglas)
+
 Release 0.20.204.0 - unreleased
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml?rev=1131299&r1=1131298&r2=1131299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml Sat Jun  4 01:00:10 2011
@@ -310,12 +310,11 @@
 </property>
 
 <property>
-  <name>mapred.reduce.copy.backoff</name>
-  <value>300</value>
-  <description>The maximum amount of time (in seconds) a reducer spends on 
-  fetching one map output before declaring it as failed.
-  </description>
-</property>
+  <name>mapreduce.reduce.shuffle.maxfetchfailures</name>
+  <value>10</value>
+  <description>The maximum number of times a reducer tries to
+  fetch a map output before it reports it.
+</description></property>
 
 <property>
   <name>mapreduce.reduce.shuffle.connect.timeout</name>

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1131299&r1=1131298&r2=1131299&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Sat Jun  4 01:00:10 2011
@@ -696,11 +696,6 @@ class ReduceTask extends Task {
      */
     private int maxInFlight;
     
-    /**
-     * the amount of time spent on fetching one map output before considering 
-     * it as failed and notifying the jobtracker about it.
-     */
-    private int maxBackoff;
     
     /**
      * busy hosts from which copies are being backed off
@@ -802,11 +797,31 @@ class ReduceTask extends Task {
     private int maxMapRuntime;
     
     /**
-     * Maximum number of fetch-retries per-map.
+     * Maximum number of fetch-retries per-map before reporting it.
      */
-    private volatile int maxFetchRetriesPerMap;
+    private int maxFetchFailuresBeforeReporting;
     
     /**
+     * Maximum number of fetch failures before reducer aborts.
+     */
+    private final int abortFailureLimit;
+
+    /**
+     * Initial penalty time in ms for a fetch failure.
+     */
+    private static final long INITIAL_PENALTY = 10000;
+
+    /**
+     * Penalty growth rate for each fetch failure.
+     */
+    private static final float PENALTY_GROWTH_RATE = 1.3f;
+
+    /**
+     * Default limit for maximum number of fetch failures before reporting.
+     */
+    private final static int REPORT_FAILURE_LIMIT = 10;
+
+    /**
      * Combiner runner, if a combiner is needed
      */
     private CombinerRunner combinerRunner;
@@ -1906,7 +1921,6 @@ class ReduceTask extends Task {
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxInFlight = 4 * numCopiers;
-      this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
       Counters.Counter combineInputCounter = 
         reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
       this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
@@ -1918,18 +1932,12 @@ class ReduceTask extends Task {
       }
       
       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
-      // the exponential backoff formula
-      //    backoff (t) = init * base^(t-1)
-      // so for max retries we get
-      //    backoff(1) + .... + backoff(max_fetch_retries) ~ max
-      // solving which we get
-      //    max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
-      // for the default value of max = 300 (5min) we get max_fetch_retries = 6
-      // the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
-      
-      // optimizing for the base 2
-      this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
-             getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
+      
+      this.abortFailureLimit = Math.max(30, numMaps / 10);
+
+      this.maxFetchFailuresBeforeReporting = conf.getInt(
+          "mapreduce.reduce.shuffle.maxfetchfailures", REPORT_FAILURE_LIMIT);
+
       this.maxFailedUniqueFetches = Math.min(numMaps, 
                                              this.maxFailedUniqueFetches);
       this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
@@ -2219,44 +2227,19 @@ class ReduceTask extends Task {
               LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
                        noFailedFetches + " from " + mapTaskId);
 
-              // half the number of max fetch retries per map during 
-              // the end of shuffle
-              int fetchRetriesPerMap = maxFetchRetriesPerMap;
-              int pendingCopies = numMaps - numCopied;
-              
-              // The check noFailedFetches != maxFetchRetriesPerMap is
-              // required to make sure of the notification in case of a
-              // corner case : 
-              // when noFailedFetches reached maxFetchRetriesPerMap and 
-              // reducer reached the end of shuffle, then we may miss sending
-              // a notification if the difference between 
-              // noFailedFetches and fetchRetriesPerMap is not divisible by 2 
-              if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
-                  noFailedFetches != maxFetchRetriesPerMap) {
-                fetchRetriesPerMap = fetchRetriesPerMap >> 1;
+              if (noFailedFetches >= abortFailureLimit) {
+                LOG.fatal(noFailedFetches + " failures downloading "
+                          + getTaskID() + ".");
+                umbilical.shuffleError(getTaskID(),
+                                 "Exceeded the abort failure limit;"
+                                 + " bailing-out.", jvmContext);
               }
               
-              // did the fetch fail too many times?
-              // using a hybrid technique for notifying the jobtracker.
-              //   a. the first notification is sent after max-retries 
-              //   b. subsequent notifications are sent after 2 retries.   
-              //   c. send notification immediately if it is a read error and 
-              //       "mapreduce.reduce.shuffle.notify.readerror" set true.   
-              if ((reportReadErrorImmediately && cr.getError().equals(
-                  CopyOutputErrorType.READ_ERROR)) ||
-                 ((noFailedFetches >= fetchRetriesPerMap) 
-                  && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
-                synchronized (ReduceTask.this) {
-                  taskStatus.addFetchFailedMap(mapTaskId);
-                  reporter.progress();
-                  LOG.info("Failed to fetch map-output from " + mapTaskId + 
-                           " even after MAX_FETCH_RETRIES_PER_MAP retries... "
-                           + " or it is a read error, "
-                           + " reporting to the JobTracker");
-                }
-              }
+              checkAndInformJobTracker(noFailedFetches, mapTaskId,
+                  cr.getError().equals(CopyOutputErrorType.READ_ERROR));
+
               // note unique failed-fetch maps
-              if (noFailedFetches == maxFetchRetriesPerMap) {
+              if (noFailedFetches == maxFetchFailuresBeforeReporting) {
                 fetchFailedMaps.add(mapId);
                   
                 // did we have too many unique failed-fetch maps?
@@ -2302,26 +2285,12 @@ class ReduceTask extends Task {
                                          "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
                                          + " bailing-out.", jvmContext);
                 }
+
               }
                 
-              // back off exponentially until num_retries <= max_retries
-              // back off by max_backoff/2 on subsequent failed attempts
               currentTime = System.currentTimeMillis();
-              int currentBackOff = noFailedFetches <= fetchRetriesPerMap 
-                                   ? BACKOFF_INIT 
-                                     * (1 << (noFailedFetches - 1)) 
-                                   : (this.maxBackoff * 1000 / 2);
-              // If it is read error,
-              //    back off for maxMapRuntime/2
-              //    during end of shuffle, 
-              //      backoff for min(maxMapRuntime/2, currentBackOff) 
-              if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
-                int backOff = maxMapRuntime >> 1;
-                if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
-                  backOff = Math.min(backOff, currentBackOff); 
-                } 
-                currentBackOff = backOff;
-              }
+              long currentBackOff = (long)(INITIAL_PENALTY *
+                  Math.pow(PENALTY_GROWTH_RATE, noFailedFetches));
 
               penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
               LOG.warn(reduceTask.getTaskID() + " adding host " +
@@ -2386,6 +2355,26 @@ class ReduceTask extends Task {
         return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
     }
     
+    // Notify the JobTracker
+    // after every read error, if 'reportReadErrorImmediately' is true or
+    // after every 'maxFetchFailuresBeforeReporting' failures
+    protected void checkAndInformJobTracker(
+        int failures, TaskAttemptID mapId, boolean readError) {
+      if ((reportReadErrorImmediately && readError)
+          || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+        synchronized (ReduceTask.this) {
+          taskStatus.addFetchFailedMap(mapId);
+          reporter.progress();
+          LOG.info("Failed to fetch map-output from " + mapId +
+                   " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+                   + " or it is a read error, "
+                   + " reporting to the JobTracker");
+        }
+      }
+    }
+
+
+
     private long createInMemorySegments(
         List<Segment<K, V>> inMemorySegments, long leaveBytes)
         throws IOException {
@@ -2887,13 +2876,6 @@ class ReduceTask extends Task {
               URI u = URI.create(event.getTaskTrackerHttp());
               String host = u.getHost();
               TaskAttemptID taskId = event.getTaskAttemptId();
-              int duration = event.getTaskRunTime();
-              if (duration > maxMapRuntime) {
-                maxMapRuntime = duration; 
-                // adjust max-fetch-retries based on max-map-run-time
-                maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
-                  getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
-              }
               URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
                                       "/mapOutput?job=" + taskId.getJobID() +
                                       "&map=" + taskId + 

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java?rev=1131299&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestReduceTaskFetchFail.java Sat Jun  4 01:00:10 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.ReduceTask;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+
+
+public class TestReduceTaskFetchFail {
+
+  public static class TestReduceTask extends ReduceTask {
+    public TestReduceTask() {
+       super();
+    }
+    public String getJobFile() { return "/foo"; }
+
+    public class TestReduceCopier extends ReduceCopier {
+      public TestReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
+                        TaskReporter reporter
+                        )throws ClassNotFoundException, IOException {
+        super(umbilical, conf, reporter);
+      }
+
+      public void checkAndInformJobTracker(int failures, TaskAttemptID mapId, boolean readError) {
+        super.checkAndInformJobTracker(failures, mapId, readError);
+      }
+
+    }
+
+  }
+
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testcheckAndInformJobTracker() throws Exception {
+    //mock creation
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    TaskReporter mockTaskReporter = mock(TaskReporter.class);
+
+    JobConf conf = new JobConf();
+    conf.setUser("testuser");
+    conf.setJobName("testJob");
+    conf.setSessionId("testSession");
+
+    TaskAttemptID tid =  new TaskAttemptID();
+    TestReduceTask rTask = new TestReduceTask();
+    rTask.setConf(conf);
+
+    ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+    reduceCopier.checkAndInformJobTracker(1, tid, false);
+
+    verify(mockTaskReporter, never()).progress();
+
+    reduceCopier.checkAndInformJobTracker(10, tid, false);
+    verify(mockTaskReporter, times(1)).progress();
+
+    // Test the config setting
+    conf.setInt("mapreduce.reduce.shuffle.maxfetchfailures", 3);
+
+    rTask.setConf(conf);
+    reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+
+    reduceCopier.checkAndInformJobTracker(1, tid, false);
+    verify(mockTaskReporter, times(1)).progress();
+
+    reduceCopier.checkAndInformJobTracker(3, tid, false);
+    verify(mockTaskReporter, times(2)).progress();
+
+    reduceCopier.checkAndInformJobTracker(5, tid, false);
+    verify(mockTaskReporter, times(2)).progress();
+
+    reduceCopier.checkAndInformJobTracker(6, tid, false);
+    verify(mockTaskReporter, times(3)).progress();
+
+    // test readError and its config
+    reduceCopier.checkAndInformJobTracker(7, tid, true);
+    verify(mockTaskReporter, times(4)).progress();
+
+    conf.setBoolean("mapreduce.reduce.shuffle.notify.readerror", false);
+
+    rTask.setConf(conf);
+    reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
+
+    reduceCopier.checkAndInformJobTracker(7, tid, true);
+    verify(mockTaskReporter, times(4)).progress();
+
+  }
+}