You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2013/06/16 05:07:57 UTC

svn commit: r1493446 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-cl...

Author: acmurthy
Date: Sun Jun 16 03:07:56 2013
New Revision: 1493446

URL: http://svn.apache.org/r1493446
Log:
Merge -c 1493445 from trunk to branch-2 to fix MAPREDUCE-5192. Allow for alternate resolutions of TaskCompletionEvents. Contributed by Chris Douglas.

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
      - copied unchanged from r1493445, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Sun Jun 16 03:07:56 2013
@@ -172,6 +172,9 @@ Release 2.1.0-beta - UNRELEASED
     MAPREDUCE-5199. Removing ApplicationTokens file as it is no longer needed.
     (Daryn Sharp via vinodkv)
 
+    MAPREDUCE-5192. Allow for alternate resolutions of TaskCompletionEvents.
+    (cdouglas via acmurthy)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Sun Jun 16 03:07:56 2013
@@ -271,12 +271,19 @@
        <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl" />
        <Bug pattern="SC_START_IN_CTOR" />
      </Match>
+    <!--
+     This class is unlikely to get subclassed, so ignore
+    -->
+    <Match>
+     <Class name="org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl" />
+     <Bug pattern="SC_START_IN_CTOR" />
+    </Match>
 
     <!--
       Do not bother if equals is not implemented. We will not need it here
     -->
      <Match>
-      <Class name="org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler$Penalty" />
+      <Class name="org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl$Penalty" />
       <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS" />
      </Match>
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java Sun Jun 16 03:07:56 2013
@@ -18,7 +18,6 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
-import java.net.URI;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,11 +36,9 @@ class EventFetcher<K,V> extends Thread {
   private final TaskUmbilicalProtocol umbilical;
   private final ShuffleScheduler<K,V> scheduler;
   private int fromEventIdx = 0;
-  private int maxEventsToFetch;
-  private ExceptionReporter exceptionReporter = null;
+  private final int maxEventsToFetch;
+  private final ExceptionReporter exceptionReporter;
   
-  private int maxMapRuntime = 0;
-
   private volatile boolean stopped = false;
   
   public EventFetcher(TaskAttemptID reduce,
@@ -113,7 +110,8 @@ class EventFetcher<K,V> extends Thread {
    * from a given event ID.
    * @throws IOException
    */  
-  protected int getMapCompletionEvents() throws IOException {
+  protected int getMapCompletionEvents()
+      throws IOException, InterruptedException {
     
     int numNewMaps = 0;
     TaskCompletionEvent events[] = null;
@@ -129,14 +127,7 @@ class EventFetcher<K,V> extends Thread {
       LOG.debug("Got " + events.length + " map completion events from " +
                fromEventIdx);
 
-      // Check if the reset is required.
-      // Since there is no ordering of the task completion events at the
-      // reducer, the only option to sync with the new jobtracker is to reset
-      // the events index
-      if (update.shouldReset()) {
-        fromEventIdx = 0;
-        scheduler.resetKnownMaps();
-      }
+      assert !update.shouldReset() : "Unexpected legacy state";
 
       // Update the last seen event ID
       fromEventIdx += events.length;
@@ -148,49 +139,14 @@ class EventFetcher<K,V> extends Thread {
       // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
       //    outputs at all.
       for (TaskCompletionEvent event : events) {
-        switch (event.getTaskStatus()) {
-        case SUCCEEDED:
-          URI u = getBaseURI(event.getTaskTrackerHttp());
-          scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
-              u.toString(),
-              event.getTaskAttemptId());
-          numNewMaps ++;
-          int duration = event.getTaskRunTime();
-          if (duration > maxMapRuntime) {
-            maxMapRuntime = duration;
-            scheduler.informMaxMapRunTime(maxMapRuntime);
-          }
-          break;
-        case FAILED:
-        case KILLED:
-        case OBSOLETE:
-          scheduler.obsoleteMapOutput(event.getTaskAttemptId());
-          LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
-              " map-task: '" + event.getTaskAttemptId() + "'");
-          break;
-        case TIPFAILED:
-          scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
-          LOG.info("Ignoring output of failed map TIP: '" +  
-              event.getTaskAttemptId() + "'");
-          break;
+        scheduler.resolve(event);
+        if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {
+          ++numNewMaps;
         }
       }
     } while (events.length == maxEventsToFetch);
 
     return numNewMaps;
   }
-  
-  private URI getBaseURI(String url) {
-    StringBuffer baseUrl = new StringBuffer(url);
-    if (!url.endsWith("/")) {
-      baseUrl.append("/");
-    }
-    baseUrl.append("mapOutput?job=");
-    baseUrl.append(reduce.getJobID());
-    baseUrl.append("&reduce=");
-    baseUrl.append(reduce.getTaskID().getId());
-    baseUrl.append("&map=");
-    URI u = URI.create(baseUrl.toString());
-    return u;
-  }
-}
\ No newline at end of file
+
+}

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Sun Jun 16 03:07:56 2013
@@ -72,7 +72,7 @@ class Fetcher<K,V> extends Thread {
   private final Counters.Counter wrongMapErrs;
   private final Counters.Counter wrongReduceErrs;
   private final MergeManager<K,V> merger;
-  private final ShuffleScheduler<K,V> scheduler;
+  private final ShuffleSchedulerImpl<K,V> scheduler;
   private final ShuffleClientMetrics metrics;
   private final ExceptionReporter exceptionReporter;
   private final int id;
@@ -90,7 +90,7 @@ class Fetcher<K,V> extends Thread {
   private static SSLFactory sslFactory;
 
   public Fetcher(JobConf job, TaskAttemptID reduceId, 
-                 ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
+                 ShuffleSchedulerImpl<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
                  ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
     this.reporter = reporter;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Sun Jun 16 03:07:56 2013
@@ -49,7 +49,7 @@ public class Shuffle<K, V> implements Sh
   private ShuffleClientMetrics metrics;
   private TaskUmbilicalProtocol umbilical;
   
-  private ShuffleScheduler<K,V> scheduler;
+  private ShuffleSchedulerImpl<K,V> scheduler;
   private MergeManager<K, V> merger;
   private Throwable throwable = null;
   private String throwingThreadName = null;
@@ -70,8 +70,8 @@ public class Shuffle<K, V> implements Sh
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
     
-    scheduler = new ShuffleScheduler<K,V>(jobConf, taskStatus, this,
-        copyPhase, context.getShuffledMapsCounter(),
+    scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
+        this, copyPhase, context.getShuffledMapsCounter(),
         context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
     merger = createMergeManager(context);
   }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Sun Jun 16 03:07:56 2013
@@ -18,432 +18,30 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ShuffleScheduler<K,V> {
 
-class ShuffleScheduler<K,V> {
-  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
-    protected Long initialValue() {
-      return 0L;
-    }
-  };
-
-  private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
-  private static final int MAX_MAPS_AT_ONCE = 20;
-  private static final long INITIAL_PENALTY = 10000;
-  private static final float PENALTY_GROWTH_RATE = 1.3f;
-  private final static int REPORT_FAILURE_LIMIT = 10;
-
-  private final boolean[] finishedMaps;
-  private final int totalMaps;
-  private int remainingMaps;
-  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
-  private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<TaskAttemptID> obsoleteMaps = new HashSet<TaskAttemptID>();
-  
-  private final Random random = new Random(System.currentTimeMillis());
-  private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
-  private final Referee referee = new Referee();
-  private final Map<TaskAttemptID,IntWritable> failureCounts =
-    new HashMap<TaskAttemptID,IntWritable>();
-  private final Map<String,IntWritable> hostFailures = 
-    new HashMap<String,IntWritable>();
-  private final TaskStatus status;
-  private final ExceptionReporter reporter;
-  private final int abortFailureLimit;
-  private final Progress progress;
-  private final Counters.Counter shuffledMapsCounter;
-  private final Counters.Counter reduceShuffleBytes;
-  private final Counters.Counter failedShuffleCounter;
-  
-  private final long startTime;
-  private long lastProgressTime;
-  
-  private int maxMapRuntime = 0;
-  private int maxFailedUniqueFetches = 5;
-  private int maxFetchFailuresBeforeReporting;
-  
-  private long totalBytesShuffledTillNow = 0;
-  private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-
-  private boolean reportReadErrorImmediately = true;
-  private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
-  
-  public ShuffleScheduler(JobConf job, TaskStatus status,
-                          ExceptionReporter reporter,
-                          Progress progress,
-                          Counters.Counter shuffledMapsCounter,
-                          Counters.Counter reduceShuffleBytes,
-                          Counters.Counter failedShuffleCounter) {
-    totalMaps = job.getNumMapTasks();
-    abortFailureLimit = Math.max(30, totalMaps / 10);
-    remainingMaps = totalMaps;
-    finishedMaps = new boolean[remainingMaps];
-    this.reporter = reporter;
-    this.status = status;
-    this.progress = progress;
-    this.shuffledMapsCounter = shuffledMapsCounter;
-    this.reduceShuffleBytes = reduceShuffleBytes;
-    this.failedShuffleCounter = failedShuffleCounter;
-    this.startTime = System.currentTimeMillis();
-    lastProgressTime = startTime;
-    referee.start();
-    this.maxFailedUniqueFetches = Math.min(totalMaps,
-        this.maxFailedUniqueFetches);
-    this.maxFetchFailuresBeforeReporting = job.getInt(
-        MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
-    this.reportReadErrorImmediately = job.getBoolean(
-        MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
-    
-    this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, 
-        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
-  }
-
-  public synchronized void copySucceeded(TaskAttemptID mapId, 
-                                         MapHost host,
-                                         long bytes,
-                                         long millis,
-                                         MapOutput<K,V> output
-                                         ) throws IOException {
-    failureCounts.remove(mapId);
-    hostFailures.remove(host.getHostName());
-    int mapIndex = mapId.getTaskID().getId();
-    
-    if (!finishedMaps[mapIndex]) {
-      output.commit();
-      finishedMaps[mapIndex] = true;
-      shuffledMapsCounter.increment(1);
-      if (--remainingMaps == 0) {
-        notifyAll();
-      }
-
-      // update the status
-      totalBytesShuffledTillNow += bytes;
-      updateStatus();
-      reduceShuffleBytes.increment(bytes);
-      lastProgressTime = System.currentTimeMillis();
-      LOG.debug("map " + mapId + " done " + status.getStateString());
-    }
-  }
-  
-  private void updateStatus() {
-    float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
-    int mapsDone = totalMaps - remainingMaps;
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
-    float transferRate = mbs / secsSinceStart;
-    progress.set((float) mapsDone / totalMaps);
-    String statusString = mapsDone + " / " + totalMaps + " copied.";
-    status.setStateString(statusString);
-
-    progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
-        + mbpsFormat.format(transferRate) + " MB/s)");
-  }
-
-  public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
-                                      boolean readError, boolean connectExcpt) {
-    host.penalize();
-    int failures = 1;
-    if (failureCounts.containsKey(mapId)) {
-      IntWritable x = failureCounts.get(mapId);
-      x.set(x.get() + 1);
-      failures = x.get();
-    } else {
-      failureCounts.put(mapId, new IntWritable(1));      
-    }
-    String hostname = host.getHostName();
-    if (hostFailures.containsKey(hostname)) {
-      IntWritable x = hostFailures.get(hostname);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostname, new IntWritable(1));
-    }
-    if (failures >= abortFailureLimit) {
-      try {
-        throw new IOException(failures + " failures downloading " + mapId);
-      } catch (IOException ie) {
-        reporter.reportException(ie);
-      }
-    }
-    
-    checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
-
-    checkReducerHealth();
-    
-    long delay = (long) (INITIAL_PENALTY *
-        Math.pow(PENALTY_GROWTH_RATE, failures));
-    if (delay > maxDelay) {
-      delay = maxDelay;
-    }
-    
-    penalties.add(new Penalty(host, delay));
-    
-    failedShuffleCounter.increment(1);
-  }
-  
-  // Notify the JobTracker  
-  // after every read error, if 'reportReadErrorImmediately' is true or
-  // after every 'maxFetchFailuresBeforeReporting' failures
-  private void checkAndInformJobTracker(
-      int failures, TaskAttemptID mapId, boolean readError, 
-      boolean connectExcpt) {
-    if (connectExcpt || (reportReadErrorImmediately && readError)
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
-      status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
-    }
-  }
-    
-  private void checkReducerHealth() {
-    final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
-    final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
-    final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
-
-    long totalFailures = failedShuffleCounter.getValue();
-    int doneMaps = totalMaps - remainingMaps;
-    
-    boolean reducerHealthy =
-      (((float)totalFailures / (totalFailures + doneMaps))
-          < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-    
-    // check if the reducer has progressed enough
-    boolean reducerProgressedEnough =
-      (((float)doneMaps / totalMaps)
-          >= MIN_REQUIRED_PROGRESS_PERCENT);
-
-    // check if the reducer is stalled for a long time
-    // duration for which the reducer is stalled
-    int stallDuration =
-      (int)(System.currentTimeMillis() - lastProgressTime);
-    
-    // duration for which the reducer ran with progress
-    int shuffleProgressDuration =
-      (int)(lastProgressTime - startTime);
-
-    // min time the reducer should run without getting killed
-    int minShuffleRunDuration =
-      (shuffleProgressDuration > maxMapRuntime)
-      ? shuffleProgressDuration
-          : maxMapRuntime;
-    
-    boolean reducerStalled =
-      (((float)stallDuration / minShuffleRunDuration)
-          >= MAX_ALLOWED_STALL_TIME_PERCENT);
-
-    // kill if not healthy and has insufficient progress
-    if ((failureCounts.size() >= maxFailedUniqueFetches ||
-        failureCounts.size() == (totalMaps - doneMaps))
-        && !reducerHealthy
-        && (!reducerProgressedEnough || reducerStalled)) {
-      LOG.fatal("Shuffle failed with too many fetch failures " +
-      "and insufficient progress!");
-      String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
-      reporter.reportException(new IOException(errorMsg));
-    }
-
-  }
-  
-  public synchronized void tipFailed(TaskID taskId) {
-    if (!finishedMaps[taskId.getId()]) {
-      finishedMaps[taskId.getId()] = true;
-      if (--remainingMaps == 0) {
-        notifyAll();
-      }
-      updateStatus();
-    }
-  }
-  
-  public synchronized void addKnownMapOutput(String hostName, 
-                                             String hostUrl,
-                                             TaskAttemptID mapId) {
-    MapHost host = mapLocations.get(hostName);
-    if (host == null) {
-      host = new MapHost(hostName, hostUrl);
-      mapLocations.put(hostName, host);
-    }
-    host.addKnownMap(mapId);
-
-    // Mark the host as pending 
-    if (host.getState() == State.PENDING) {
-      pendingHosts.add(host);
-      notifyAll();
-    }
-  }
-  
-  public synchronized void obsoleteMapOutput(TaskAttemptID mapId) {
-    obsoleteMaps.add(mapId);
-  }
-  
-  public synchronized void putBackKnownMapOutput(MapHost host, 
-                                                 TaskAttemptID mapId) {
-    host.addKnownMap(mapId);
-  }
-
-  public synchronized MapHost getHost() throws InterruptedException {
-      while(pendingHosts.isEmpty()) {
-        wait();
-      }
-      
-      MapHost host = null;
-      Iterator<MapHost> iter = pendingHosts.iterator();
-      int numToPick = random.nextInt(pendingHosts.size());
-      for (int i=0; i <= numToPick; ++i) {
-        host = iter.next();
-      }
-      
-      pendingHosts.remove(host);     
-      host.markBusy();
-      
-      LOG.info("Assiging " + host + " with " + host.getNumKnownMapOutputs() + 
-               " to " + Thread.currentThread().getName());
-      shuffleStart.set(System.currentTimeMillis());
-      
-      return host;
-  }
-  
-  public synchronized List<TaskAttemptID> getMapsForHost(MapHost host) {
-    List<TaskAttemptID> list = host.getAndClearKnownMaps();
-    Iterator<TaskAttemptID> itr = list.iterator();
-    List<TaskAttemptID> result = new ArrayList<TaskAttemptID>();
-    int includedMaps = 0;
-    int totalSize = list.size();
-    // find the maps that we still need, up to the limit
-    while (itr.hasNext()) {
-      TaskAttemptID id = itr.next();
-      if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
-        result.add(id);
-        if (++includedMaps >= MAX_MAPS_AT_ONCE) {
-          break;
-        }
-      }
-    }
-    // put back the maps left after the limit
-    while (itr.hasNext()) {
-      TaskAttemptID id = itr.next();
-      if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
-        host.addKnownMap(id);
-      }
-    }
-    LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
-             host + " to " + Thread.currentThread().getName());
-    return result;
-  }
-
-  public synchronized void freeHost(MapHost host) {
-    if (host.getState() != State.PENALIZED) {
-      if (host.markAvailable() == State.PENDING) {
-        pendingHosts.add(host);
-        notifyAll();
-      }
-    }
-    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
-  }
-    
-  public synchronized void resetKnownMaps() {
-    mapLocations.clear();
-    obsoleteMaps.clear();
-    pendingHosts.clear();
-  }
-  
   /**
    * Wait until the shuffle finishes or until the timeout.
    * @param millis maximum wait time
    * @return true if the shuffle is done
    * @throws InterruptedException
    */
-  public synchronized boolean waitUntilDone(int millis
-                                            ) throws InterruptedException {
-    if (remainingMaps > 0) {
-      wait(millis);
-      return remainingMaps == 0;
-    }
-    return true;
-  }
-  
-  /**
-   * A structure that records the penalty for a host.
-   */
-  private static class Penalty implements Delayed {
-    MapHost host;
-    private long endTime;
-    
-    Penalty(MapHost host, long delay) {
-      this.host = host;
-      this.endTime = System.currentTimeMillis() + delay;
-    }
-
-    public long getDelay(TimeUnit unit) {
-      long remainingTime = endTime - System.currentTimeMillis();
-      return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
-    }
+  public boolean waitUntilDone(int millis) throws InterruptedException;
 
-    public int compareTo(Delayed o) {
-      long other = ((Penalty) o).endTime;
-      return endTime == other ? 0 : (endTime < other ? -1 : 1);
-    }
-    
-  }
-  
   /**
-   * A thread that takes hosts off of the penalty list when the timer expires.
+   * Interpret a {@link TaskCompletionEvent} from the event stream.
+   * @param tce Intermediate output metadata
    */
-  private class Referee extends Thread {
-    public Referee() {
-      setName("ShufflePenaltyReferee");
-      setDaemon(true);
-    }
+  public void resolve(TaskCompletionEvent tce)
+    throws IOException, InterruptedException;
 
-    public void run() {
-      try {
-        while (true) {
-          // take the first host that has an expired penalty
-          MapHost host = penalties.take().host;
-          synchronized (ShuffleScheduler.this) {
-            if (host.markAvailable() == MapHost.State.PENDING) {
-              pendingHosts.add(host);
-              ShuffleScheduler.this.notifyAll();
-            }
-          }
-        }
-      } catch (InterruptedException ie) {
-        return;
-      } catch (Throwable t) {
-        reporter.reportException(t);
-      }
-    }
-  }
-  
-  public void close() throws InterruptedException {
-    referee.interrupt();
-    referee.join();
-  }
+  public void close() throws InterruptedException;
 
-  public synchronized void informMaxMapRunTime(int duration) {
-    if (duration > maxMapRuntime) {
-      maxMapRuntime = duration;
-    }
-  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java Sun Jun 16 03:07:56 2013
@@ -43,7 +43,8 @@ import org.mockito.InOrder;
 public class TestEventFetcher {
 
   @Test
-  public void testConsecutiveFetch() throws IOException {
+  public void testConsecutiveFetch()
+      throws IOException, InterruptedException {
     final int MAX_EVENTS_TO_FETCH = 100;
     TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
 
@@ -63,7 +64,8 @@ public class TestEventFetcher {
       .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
 
     @SuppressWarnings("unchecked")
-    ShuffleScheduler<String,String> scheduler = mock(ShuffleScheduler.class);
+    ShuffleScheduler<String,String> scheduler =
+      mock(ShuffleScheduler.class);
     ExceptionReporter reporter = mock(ExceptionReporter.class);
 
     EventFetcherForTest<String,String> ef =
@@ -79,8 +81,8 @@ public class TestEventFetcher {
         eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
     inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
         eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
-    verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).addKnownMapOutput(
-        anyString(), anyString(), any(TaskAttemptID.class));
+    verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).resolve(
+        any(TaskCompletionEvent.class));
   }
 
   private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
@@ -108,7 +110,8 @@ public class TestEventFetcher {
     }
 
     @Override
-    public int getMapCompletionEvents() throws IOException {
+    public int getMapCompletionEvents()
+        throws IOException, InterruptedException {
       return super.getMapCompletionEvents();
     }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Sun Jun 16 03:07:56 2013
@@ -56,9 +56,10 @@ public class TestFetcher {
     private HttpURLConnection connection;
 
     public FakeFetcher(JobConf job, TaskAttemptID reduceId,
-        ShuffleScheduler<K,V> scheduler, MergeManagerImpl<K,V> merger, Reporter reporter,
-        ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
-        SecretKey jobTokenSecret, HttpURLConnection connection) {
+        ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger,
+        Reporter reporter, ShuffleClientMetrics metrics,
+        ExceptionReporter exceptionReporter, SecretKey jobTokenSecret,
+        HttpURLConnection connection) {
       super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
           jobTokenSecret);
       this.connection = connection;
@@ -79,7 +80,7 @@ public class TestFetcher {
     LOG.info("testCopyFromHostConnectionTimeout");
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
-    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
     MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     Reporter r = mock(Reporter.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
@@ -127,7 +128,7 @@ public class TestFetcher {
     LOG.info("testCopyFromHostBogusHeader");
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
-    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
     MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     Reporter r = mock(Reporter.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
@@ -182,7 +183,7 @@ public class TestFetcher {
     LOG.info("testCopyFromHostWait");
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
-    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
     MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     Reporter r = mock(Reporter.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
@@ -240,7 +241,7 @@ public class TestFetcher {
     LOG.info("testCopyFromHostWaitExtraBytes");
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
-    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
     MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
 
@@ -256,7 +257,6 @@ public class TestFetcher {
     
     Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
         r, metrics, except, key, connection);
-    
 
     MapHost host = new MapHost("localhost", "http://localhost:8080/");
     
@@ -315,7 +315,7 @@ public class TestFetcher {
     LOG.info("testCopyFromHostCompressFailure");
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
-    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
     MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
     Reporter r = mock(Reporter.class);

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java?rev=1493446&r1=1493445&r2=1493446&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java Sun Jun 16 03:07:56 2013
@@ -47,8 +47,10 @@ public class TestShuffleScheduler {
     };
     Progress progress = new Progress();
 
-    ShuffleScheduler scheduler = new ShuffleScheduler(job, status, null,
-        progress, null, null, null);
+    TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
+        0, 0);
+    ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
+        reduceId, null, progress, null, null, null);
 
     JobID jobId = new JobID();
     TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);