You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/10/21 22:33:50 UTC

tez git commit: TEZ-2882. Consider improving fetch failure handling (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master a9cfeb914 -> 46f2454ac


TEZ-2882. Consider improving fetch failure handling (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/46f2454a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/46f2454a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/46f2454a

Branch: refs/heads/master
Commit: 46f2454acfb22eebd7937280854b2a8d6b5003b9
Parents: a9cfeb9
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Oct 21 13:34:14 2015 -0700
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Oct 21 13:34:14 2015 -0700

----------------------------------------------------------------------
 .../library/api/TezRuntimeConfiguration.java    | 108 ++++
 .../orderedgrouped/FetcherOrderedGrouped.java   |  10 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java |   9 +-
 .../orderedgrouped/ShuffleScheduler.java        | 462 ++++++++++----
 .../library/input/OrderedGroupedKVInput.java    |  14 +
 .../shuffle/orderedgrouped/TestFetcher.java     |  10 +-
 ...tShuffleInputEventHandlerOrderedGrouped.java |  22 +-
 .../orderedgrouped/TestShuffleScheduler.java    | 623 ++++++++++++++++++-
 .../TestOrderedGroupedMergedKVInputConfig.java  |  30 +
 9 files changed, 1149 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index a84448f..cf05546 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -198,6 +198,106 @@ public class TezRuntimeConfiguration {
       "shuffle.fetch.failures.limit";
   public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5;
 
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "integer")
+  /**
+   * Expert setting made available only for debugging. Do not change it. Sets
+   * the number of retries before giving up on downloading from source
+   * attempt by consumer. Code internally handles the threshold if set to -1.
+   */
+  public static final String
+      TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT =
+      TEZ_RUNTIME_PREFIX + "shuffle.src-attempt.abort.limit";
+  public static final int
+      TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT = -1;
+
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "float")
+  /**
+   * Expert setting made available only for debugging. Do not change it. Setting
+   * to determine if failures happened across a percentage of nodes. This
+   * helps in determining if the consumer has to be restarted on continuous
+   * failures. Setting it to lower value can make consumer restarts more
+   * aggressive on failures.
+   */
+  public static final String
+      TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION =
+      TEZ_RUNTIME_PREFIX + "shuffle.acceptable.host-fetch.failure.fraction";
+  public static final float
+      TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT = 0.2f;
+
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "integer")
+  /**
+   * Expert setting made available only for debugging. Do not change it. Setting
+   * to determine if the consumer has to be restarted on continuous
+   * failures across nodes. Used along with {@link
+   * TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION}.
+   */
+  public static final String
+      TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST =
+      TEZ_RUNTIME_PREFIX + "shuffle.min.failures.per.host";
+  public static final int TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT = 4;
+
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "float")
+  /**
+   * Expert setting made available only for debugging. Do not change it.
+   * Maximum percentage of time (compared to overall progress), the fetcher is
+   * allowed before concluding that it is stalled.
+   */
+  public static final String TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION =
+      TEZ_RUNTIME_PREFIX + "shuffle.max.stall.time.fraction";
+  public static final float
+      TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT = 0.5f;
+
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "float")
+  /**
+   * Expert setting made available only for debugging. Do not change it.
+   * Fraction to determine whether the shuffle has progressed enough or not
+   * If it has not progressed enough, it could be qualified for the consumer.
+   */
+  public static final String
+      TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION =
+      TEZ_RUNTIME_PREFIX + "shuffle.min.required.progress.fraction";
+  public static final float
+      TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT = 0.5f;
+
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "float")
+  /**
+   * Expert setting made available only for debugging. Do not change it.
+   * Provides threshold for determining whether fetching has to be marked
+   * unhealthy based on the ratio of (failures/(failures+completed))
+   */
+  public static final String
+      TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION =
+      TEZ_RUNTIME_PREFIX + "shuffle.max.allowed.failed.fetch.fraction";
+  public static final float
+      TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT = 0.5f;
+
+  @Private
+  @Unstable
+  @ConfigurationProperty(type = "boolean")
+  /**
+   * Expert setting made available only for debugging. Do not change it.
+   * Provides threshold for determining whether fetching has to be marked
+   * unhealthy based on the ratio of (failures/(failures+completed))
+   */
+  public static final String
+      TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION =
+      TEZ_RUNTIME_PREFIX + "shuffle.failed.check.since-last.completion";
+  public static final boolean
+      TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT = true;
+
+
   @ConfigurationProperty(type = "integer")
   public static final String TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE =
       TEZ_RUNTIME_PREFIX +
@@ -418,6 +518,14 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+    tezRuntimeKeys.add
+        (TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
     tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS);
     tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 1b4031d..93f083d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -297,7 +297,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
         } else {
           LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
           for (InputAttemptIdentifier left : failedTasks) {
-            scheduler.copyFailed(left, host, true, false);
+            scheduler.copyFailed(left, host, true, false, false);
           }
         }
       }
@@ -359,7 +359,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       for (InputAttemptIdentifier left : remaining.values()) {
         // Need to be handling temporary glitches ..
         // Report read error to the AM to trigger source failure heuristics
-        scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
+        scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded, false);
       }
       return false;
     }
@@ -505,7 +505,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       retryStartTime = 0;
 
       scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
-                              endTime - startTime, mapOutput);
+                              endTime - startTime, mapOutput, false);
       // Note successful shuffle
       remaining.remove(srcAttemptId.toString());
       metrics.successFetch();
@@ -667,7 +667,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
           mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
           long endTime = System.currentTimeMillis();
           scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
-              indexRecord.getRawLength(), (endTime - startTime), mapOutput);
+              indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
           iter.remove();
           metrics.successFetch();
         } catch (IOException e) {
@@ -677,7 +677,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
           if (!stopped) {
             metrics.failedFetch();
             ioErrs.increment(1);
-            scheduler.copyFailed(srcAttemptId, host, true, false);
+            scheduler.copyFailed(srcAttemptId, host, true, false, true);
             LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
                 host.getHostIdentifier(), e);
           } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index e0473b3..f8c9553 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -50,7 +50,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   private final ShuffleScheduler scheduler;
   private final InputContext inputContext;
 
-  private int maxMapRuntime = 0;
   private final boolean sslShuffle;
 
   private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
@@ -110,12 +109,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
           + ", attemptNum: " + dmEvent.getVersion() + ", payload: " +
           ShuffleUtils.stringify(shufflePayload));
     }
-    // TODO NEWTEZ See if this duration hack can be removed.
-    int duration = shufflePayload.getRunDuration();
-    if (duration > maxMapRuntime) {
-      maxMapRuntime = duration;
-      scheduler.informMaxMapRunTime(maxMapRuntime);
-    }
     if (shufflePayload.hasEmptyPartitions()) {
       try {
         byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
@@ -128,7 +121,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
                     + srcAttemptIdentifier + "]. Not fetching.");
           }
           numDmeEventsNoData.incrementAndGet();
-          scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
+          scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null, true);
           return;
         }
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index f45ca35..22da46c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -94,7 +95,7 @@ class ShuffleScheduler {
   private final AtomicLong shuffleStart = new AtomicLong(0);
 
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
-  private static final long INITIAL_PENALTY = 2000l; // 2 seconds
+  static final long INITIAL_PENALTY = 2000L; // 2 seconds
   private static final float PENALTY_GROWTH_RATE = 1.3f;
 
   private final BitSet finishedMaps;
@@ -103,22 +104,26 @@ class ShuffleScheduler {
   @VisibleForTesting
   final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
   //TODO Clean this and other maps at some point
-  private final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
+  @VisibleForTesting
+  final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap
+      = new ConcurrentHashMap<String, InputAttemptIdentifier>();
 
   //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
   // enabled in source.
   @VisibleForTesting
-  final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
+  final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap;
 
-  private final Set<MapHost> pendingHosts = new HashSet<MapHost>();
+  @VisibleForTesting
+  final Set<MapHost> pendingHosts = new HashSet<MapHost>();
   private final Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
 
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   private final Random random = new Random(System.currentTimeMillis());
   private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
   private final Referee referee;
-  private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
-    new HashMap<InputAttemptIdentifier,IntWritable>(); 
+  @VisibleForTesting
+  final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>();
+  final Set<String> uniqueHosts = Sets.newHashSet();
   private final Map<String,IntWritable> hostFailures = 
     new HashMap<String,IntWritable>();
   private final InputContext inputContext;
@@ -126,7 +131,8 @@ class ShuffleScheduler {
   private final TezCounter skippedInputCounter;
   private final TezCounter reduceShuffleBytes;
   private final TezCounter reduceBytesDecompressed;
-  private final TezCounter failedShuffleCounter;
+  @VisibleForTesting
+  final TezCounter failedShuffleCounter;
   private final TezCounter bytesShuffledToDisk;
   private final TezCounter bytesShuffledToDiskDirect;
   private final TezCounter bytesShuffledToMem;
@@ -134,9 +140,13 @@ class ShuffleScheduler {
   private final TezCounter lastEventReceived;
 
   private final String srcNameTrimmed;
-  private final AtomicInteger remainingMaps;
+  @VisibleForTesting
+  final AtomicInteger remainingMaps;
   private final long startTime;
-  private long lastProgressTime;
+  @VisibleForTesting
+  long lastProgressTime;
+  @VisibleForTesting
+  long failedShufflesSinceLastCompletion;
 
   private final int numFetchers;
   private final Set<FetcherOrderedGrouped> runningFetchers =
@@ -171,12 +181,18 @@ class ShuffleScheduler {
   private final boolean reportReadErrorImmediately;
   private final int maxFailedUniqueFetches;
   private final int abortFailureLimit;
-  private int maxMapRuntime = 0;
+
+  private final int minFailurePerHost;
+  private final float hostFailureFraction;
+  private final float maxStallTimeFraction;
+  private final float minReqProgressFraction;
+  private final float maxAllowedFailedFetchFraction;
+  private final boolean checkFailedFetchSinceLastCompletion;
 
   private volatile Thread shuffleSchedulerThread = null;
 
   private long totalBytesShuffledTillNow = 0;
-  private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
+  private final DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
 
   public ShuffleScheduler(InputContext inputContext,
                           Configuration conf,
@@ -195,7 +211,15 @@ class ShuffleScheduler {
     this.allocator = allocator;
     this.mergeManager = mergeManager;
     this.numInputs = numberOfInputs;
-    abortFailureLimit = Math.max(30, numberOfInputs / 10);
+    int abortFailureLimitConf = conf.getInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT);
+    if (abortFailureLimitConf <= -1) {
+      abortFailureLimit = Math.max(15, numberOfInputs / 10);
+    } else {
+      //No upper cap, as user is setting this intentionally
+      abortFailureLimit = abortFailureLimitConf;
+    }
     remainingMaps = new AtomicInteger(numberOfInputs);
     finishedMaps = new BitSet(numberOfInputs);
     this.ifileReadAhead = ifileReadAhead;
@@ -211,6 +235,47 @@ class ShuffleScheduler {
     localDiskFetchEnabled = conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+
+    this.minFailurePerHost = conf.getInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT);
+    Preconditions.checkArgument(minFailurePerHost >= 0,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST
+            + "=" + minFailurePerHost + " should not be negative");
+
+    this.hostFailureFraction = conf.getFloat(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION,
+        TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT);
+
+    this.maxStallTimeFraction = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT);
+    Preconditions.checkArgument(maxStallTimeFraction >= 0,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION
+            + "=" + maxStallTimeFraction + " should not be negative");
+
+    this.minReqProgressFraction = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION,
+        TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT);
+    Preconditions.checkArgument(minReqProgressFraction >= 0,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION
+            + "=" + minReqProgressFraction + " should not be negative");
+
+    this.maxAllowedFailedFetchFraction = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT);
+    Preconditions.checkArgument(maxAllowedFailedFetchFraction >= 0,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION
+            + "=" + maxAllowedFailedFetchFraction + " should not be negative");
+
+    this.checkFailedFetchSinceLastCompletion = conf.getBoolean
+        (TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION,
+            TezRuntimeConfiguration
+                .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
+
     this.localHostname = inputContext.getExecutionContext().getHostName();
     final ByteBuffer shuffleMetadata =
         inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
@@ -284,16 +349,22 @@ class ShuffleScheduler {
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
     this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
 
-    shuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
+    pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
         + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
         + ", reportReadErrorImmediately=" + reportReadErrorImmediately
         + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
         + ", abortFailureLimit=" + abortFailureLimit
-        + ", maxMapRuntime=" + maxMapRuntime
         + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce
-        + ", numFetchers=" + numFetchers);
+        + ", numFetchers=" + numFetchers
+        + ", hostFailureFraction=" + hostFailureFraction
+        + ", minFailurePerHost=" + minFailurePerHost
+        + ", maxAllowedFailedFetchFraction=" + maxAllowedFailedFetchFraction
+        + ", maxStallTimeFraction=" + maxStallTimeFraction
+        + ", minReqProgressFraction=" + minReqProgressFraction
+        + ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion
+    );
   }
 
   public void start() throws Exception {
@@ -391,10 +462,17 @@ class ShuffleScheduler {
                                          long bytesCompressed,
                                          long bytesDecompressed,
                                          long millis,
-                                         MapOutput output
+                                         MapOutput output,
+                                         boolean isLocalFetch
                                          ) throws IOException {
 
     if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
+      if (!isLocalFetch) {
+        /**
+         * Reset it only when it is a non-local-disk copy.
+         */
+        failedShufflesSinceLastCompletion = 0;
+      }
       if (output != null) {
 
         failureCounts.remove(srcAttemptIdentifier);
@@ -435,12 +513,12 @@ class ShuffleScheduler {
           return;
         }
 
-        ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier);
+        ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(inputIdentifier);
 
         //Possible that Shuffle event handler invoked this, due to empty partitions
         if (eventInfo == null && output == null) {
           eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
-          shuffleInfoEventsMap.put(inputIdentifier, eventInfo);
+          pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo);
         }
 
         assert(eventInfo != null);
@@ -455,10 +533,10 @@ class ShuffleScheduler {
         if (eventInfo.isDone()) {
           remainingMaps.decrementAndGet();
           setInputFinished(inputIdentifier.getInputIndex());
-          shuffleInfoEventsMap.remove(inputIdentifier);
+          pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
           if (LOG.isTraceEnabled()) {
             LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " +
-                shuffleInfoEventsMap);
+                pipelinedShuffleInfoEventsMap);
           }
         }
 
@@ -486,7 +564,8 @@ class ShuffleScheduler {
       }
     } else {
       // input is already finished. duplicate fetch.
-      LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
+      LOG.warn("Duplicate fetch of input no longer needs to be fetched: "
+          + srcAttemptIdentifier);
       // free the resource - specially memory
       
       // If the src does not generate data, output will be null.
@@ -501,7 +580,7 @@ class ShuffleScheduler {
     //For pipelined shuffle.
     //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt
     if (input.canRetrieveInputInChunks()) {
-      ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
+      ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(input.getInputIdentifier());
       if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) {
         reportExceptionForInput(new IOException("Previous event already got scheduled for " +
             input + ". Previous attempt's data could have been already merged "
@@ -512,7 +591,8 @@ class ShuffleScheduler {
       }
 
       if (eventInfo == null) {
-        shuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input));
+        pipelinedShuffleInfoEventsMap.put(input.getInputIdentifier(),
+            new ShuffleEventInfo(input));
       }
     }
     return true;
@@ -543,95 +623,264 @@ class ShuffleScheduler {
   public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
                                       MapHost host,
                                       boolean readError,
-                                      boolean connectError) {
-    host.penalize();
-    int failures = 1;
-    if (failureCounts.containsKey(srcAttempt)) {
-      IntWritable x = failureCounts.get(srcAttempt);
-      x.set(x.get() + 1);
-      failures = x.get();
-    } else {
-      failureCounts.put(srcAttempt, new IntWritable(1));      
+                                      boolean connectError,
+                                      boolean isLocalFetch) {
+    failedShuffleCounter.increment(1);
+
+    int failures = incrementAndGetFailureAttempt(srcAttempt);
+
+    if (!isLocalFetch) {
+      /**
+       * Track the number of failures that has happened since last completion.
+       * This gets reset on a successful copy.
+       */
+      failedShufflesSinceLastCompletion++;
     }
-    String hostPort = host.getHostIdentifier();
-    // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error
-    // reporting / potential blacklisting of hosts.
-    if (hostFailures.containsKey(hostPort)) {
-      IntWritable x = hostFailures.get(hostPort);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostPort, new IntWritable(1));
+
+    /**
+     * Inform AM:
+     *    - In case of read/connect error
+     *    - In case attempt failures exceed threshold of
+     *    maxFetchFailuresBeforeReporting (5)
+     * Bail-out if needed:
+     *    - Check whether individual attempt crossed failure threshold limits
+     *    - Check overall shuffle health. Bail out if needed.*
+     */
+
+    //TEZ-2890
+    boolean shouldInformAM =
+        (reportReadErrorImmediately && (readError || connectError))
+            || ((failures % maxFetchFailuresBeforeReporting) == 0);
+
+    if (shouldInformAM) {
+      //Inform AM. In case producer needs to be restarted, it is handled at AM.
+      informAM(srcAttempt);
+    }
+
+    //Restart consumer in case shuffle is not healthy
+    if (!isShuffleHealthy(srcAttempt)) {
+      return;
     }
-    if (failures >= abortFailureLimit) {
+
+    penalizeHost(host, failures);
+  }
+
+  private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
+    int attemptFailures = getFailureCount(srcAttempt);
+    if (attemptFailures >= abortFailureLimit) {
       // This task has seen too many fetch failures - report it as failed. The
       // AM may retry it if max failures has not been reached.
-      
+
       // Between the task and the AM - someone needs to determine who is at
       // fault. If there's enough errors seen on the task, before the AM informs
       // it about source failure, the task considers itself to have failed and
       // allows the AM to re-schedule it.
-      IOException ioe = new IOException(failures
-            + " failures downloading "
-            + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
-                srcAttempt.getAttemptNumber()));
-      ioe.fillInStackTrace();
+      String errorMsg = "Failed " + attemptFailures + " times trying to "
+          + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier(
+          inputContext.getSourceVertexName(),
+          srcAttempt.getInputIdentifier().getInputIndex(),
+          srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
+      IOException ioe = new IOException(errorMsg);
       // Shuffle knows how to deal with failures post shutdown via the onFailure hook
       exceptionReporter.reportException(ioe);
+      return true;
     }
+    return false;
+  }
 
-    failedShuffleCounter.increment(1);
-    checkAndInformAM(failures, srcAttempt, readError, connectError);
+  private void penalizeHost(MapHost host, int failures) {
+    host.penalize();
+
+    String hostPort = host.getHostIdentifier();
+    // TODO TEZ-922 hostFailures isn't really used for anything apart from
+    // hasFailedAcrossNodes().Factor it into error
+    // reporting / potential blacklisting of hosts.
+    if (hostFailures.containsKey(hostPort)) {
+      IntWritable x = hostFailures.get(hostPort);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostPort, new IntWritable(1));
+    }
 
-    checkReducerHealth();
-    
     long delay = (long) (INITIAL_PENALTY *
         Math.pow(PENALTY_GROWTH_RATE, failures));
-    
     penalties.add(new Penalty(host, delay));
   }
 
+  private int getFailureCount(InputAttemptIdentifier srcAttempt) {
+    IntWritable failureCount = failureCounts.get(srcAttempt);
+    return (failureCount == null) ? 0 : failureCount.get();
+  }
+
+  private int incrementAndGetFailureAttempt(InputAttemptIdentifier srcAttempt) {
+    int failures = 1;
+    if (failureCounts.containsKey(srcAttempt)) {
+      IntWritable x = failureCounts.get(srcAttempt);
+      x.set(x.get() + 1);
+      failures = x.get();
+    } else {
+      failureCounts.put(srcAttempt, new IntWritable(1));
+    }
+    return failures;
+  }
+
   public void reportLocalError(IOException ioe) {
-    LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", ioe);
+    LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error",
+        ioe);
     // Shuffle knows how to deal with failures post shutdown via the onFailure hook
     exceptionReporter.reportException(ioe);
   }
 
-  // Notify the AM  
-  // after every read error, if 'reportReadErrorImmediately' is true or
-  // after every 'maxFetchFailuresBeforeReporting' failures
-  private void checkAndInformAM(
-      int failures, InputAttemptIdentifier srcAttempt, boolean readError,
-      boolean connectError) {
-    if ((reportReadErrorImmediately && (readError || connectError))
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info(srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
-          + srcAttempt + " taskAttemptIdentifier: "
-          + TezRuntimeUtils.getTaskAttemptIdentifier(
-          inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
-          srcAttempt.getAttemptNumber()) + " to AM.");
-      List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-      failedEvents.add(InputReadErrorEvent.create("Fetch failure for "
-          + TezRuntimeUtils.getTaskAttemptIdentifier(
-          inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
-          srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
-          .getInputIndex(), srcAttempt.getAttemptNumber()));
-
-      inputContext.sendEvents(failedEvents);      
+  // Notify AM
+  private void informAM(InputAttemptIdentifier srcAttempt) {
+    LOG.info(
+        srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
+            + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils
+            .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+                srcAttempt.getInputIdentifier().getInputIndex(),
+                srcAttempt.getAttemptNumber()) + " to AM.");
+    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+    failedEvents.add(InputReadErrorEvent.create(
+        "Fetch failure for " + TezRuntimeUtils
+            .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+                srcAttempt.getInputIdentifier().getInputIndex(),
+                srcAttempt.getAttemptNumber()) + " to jobtracker.",
+        srcAttempt.getInputIdentifier().getInputIndex(),
+        srcAttempt.getAttemptNumber()));
+
+    inputContext.sendEvents(failedEvents);
+  }
+
+  /**
+   * To determine if failures happened across nodes or not. This will help in
+   * determining whether this task needs to be restarted or source needs to
+   * be restarted.
+   *
+   * @param logContext context info for logging
+   * @return boolean true indicates this task needs to be restarted
+   */
+  private boolean hasFailedAcrossNodes(String logContext) {
+    int numUniqueHosts = uniqueHosts.size();
+    Preconditions.checkArgument(numUniqueHosts > 0, "No values in unique hosts");
+    int threshold = Math.max(3,
+        (int) Math.ceil(numUniqueHosts * hostFailureFraction));
+    int total = 0;
+    boolean failedAcrossNodes = false;
+    for(String host : uniqueHosts) {
+      IntWritable failures = hostFailures.get(host);
+      if (failures != null && failures.get() > minFailurePerHost) {
+        total++;
+        failedAcrossNodes = (total > (threshold * minFailurePerHost));
+        if (failedAcrossNodes) {
+          break;
+        }
+      }
+    }
+
+    LOG.info(logContext + ", numUniqueHosts=" + numUniqueHosts
+        + ", hostFailureThreshold=" + threshold
+        + ", hostFailuresCount=" + hostFailures.size()
+        + ", hosts crossing threshold=" + total
+        + ", reducerFetchIssues=" + failedAcrossNodes
+      );
+
+    return failedAcrossNodes;
+  }
+
+  private boolean allEventsReceived() {
+    if (!pipelinedShuffleInfoEventsMap.isEmpty()) {
+      return (pipelinedShuffleInfoEventsMap.size() == numInputs);
+    } else {
+      //no pipelining
+      return ((pathToIdentifierMap.size() + skippedInputCounter.getValue())
+          == numInputs);
     }
   }
 
-  private void checkReducerHealth() {
-    final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
-    final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
-    final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+  /**
+   * Check if consumer needs to be restarted based on total failures w.r.t
+   * completed outputs and based on number of errors that have happened since
+   * last successful completion. Consider into account whether failures have
+   * been seen across different nodes.
+   *
+   * @return true to indicate fetchers are healthy
+   */
+  private boolean isFetcherHealthy(String logContext) {
 
     long totalFailures = failedShuffleCounter.getValue();
     int doneMaps = numInputs - remainingMaps.get();
-    
-    boolean reducerHealthy =
-      (((float)totalFailures / (totalFailures + doneMaps))
-          < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+
+    boolean fetcherHealthy = true;
+    if (doneMaps > 0) {
+      fetcherHealthy = (((float) totalFailures / (totalFailures + doneMaps))
+          < maxAllowedFailedFetchFraction);
+    }
+
+    if (fetcherHealthy) {
+      //Compute this logic only when all events are received
+      if (allEventsReceived()) {
+        if (hostFailureFraction > 0) {
+          boolean failedAcrossNodes = hasFailedAcrossNodes(logContext);
+          if (failedAcrossNodes) {
+            return false; //not healthy
+          }
+        }
+
+        if (checkFailedFetchSinceLastCompletion) {
+          /**
+           * remainingMaps works better instead of pendingHosts in the
+           * following condition because of the way the fetcher reports failures
+           */
+          if (failedShufflesSinceLastCompletion >=
+              remainingMaps.get() * minFailurePerHost) {
+            /**
+             * Check if lots of errors are seen after last progress time.
+             *
+             * E.g totalFailures = 20. doneMaps = 320 - 300;
+             * fetcherHealthy = (20/(20+300)) < 0.5.  So reducer would be marked as healthy.
+             * Assume 20 errors happen when downloading the last 20 attempts. Host failure & individual
+             * attempt failures would keep increasing; but at very slow rate 15 * 180 seconds per
+             * attempt to find out the issue.
+             *
+             * Instead consider the new errors with the pending items to be fetched.
+             * Assume 21 new errors happened after last progress; remainingMaps = (320-300) = 20;
+             * (21 / (21 + 20)) > 0.5
+             * So we reset the reducer to unhealthy here (special case)
+             *
+             * In normal conditions (i.e happy path), this wouldn't even cause any issue as
+             * failedShufflesSinceLastCompletion is reset as soon as we see successful download.
+             */
+
+            fetcherHealthy =
+                (((float) failedShufflesSinceLastCompletion / (
+                    failedShufflesSinceLastCompletion + remainingMaps.get()))
+                    < maxAllowedFailedFetchFraction);
+
+            LOG.info(logContext + ", fetcherHealthy=" + fetcherHealthy
+                + ", failedShufflesSinceLastCompletion="
+                + failedShufflesSinceLastCompletion
+                + ", remainingMaps=" + remainingMaps.get()
+            );
+          }
+        }
+      }
+    }
+    return fetcherHealthy;
+  }
+
+  boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {
+
+    if (isAbortLimitExceeedFor(srcAttempt)) {
+      return false;
+    }
+
+    final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
+    final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;
+
+    int doneMaps = numInputs - remainingMaps.get();
+
+    String logContext = "srcAttempt=" + srcAttempt.toString();
+    boolean fetcherHealthy = isFetcherHealthy(logContext);
     
     // check if the reducer has progressed enough
     boolean reducerProgressedEnough =
@@ -647,31 +896,31 @@ class ShuffleScheduler {
     int shuffleProgressDuration =
       (int)(lastProgressTime - startTime);
 
-    // min time the reducer should run without getting killed
-    int minShuffleRunDuration =
-      (shuffleProgressDuration > maxMapRuntime)
-      ? shuffleProgressDuration
-          : maxMapRuntime;
-    
-    boolean reducerStalled =
-      (((float)stallDuration / minShuffleRunDuration)
+    boolean reducerStalled = (shuffleProgressDuration > 0) &&
+      (((float)stallDuration / shuffleProgressDuration)
           >= MAX_ALLOWED_STALL_TIME_PERCENT);
 
     // kill if not healthy and has insufficient progress
     if ((failureCounts.size() >= maxFailedUniqueFetches ||
         failureCounts.size() == (numInputs - doneMaps))
-        && !reducerHealthy
+        && !fetcherHealthy
         && (!reducerProgressedEnough || reducerStalled)) {
-      LOG.error(srcNameTrimmed + ": " + "Shuffle failed with too many fetch failures " +
-          "and insufficient progress!"
-          + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps)
-          + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
-          + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
-      String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
+      String errorMsg = (srcNameTrimmed + ": "
+          + "Shuffle failed with too many fetch failures and insufficient progress!"
+          + "failureCounts=" + failureCounts.size()
+          + ", pendingInputs=" + (numInputs - doneMaps)
+          + ", fetcherHealthy=" + fetcherHealthy
+          + ", reducerProgressedEnough=" + reducerProgressedEnough
+          + ", reducerStalled=" + reducerStalled);
+      LOG.error(errorMsg);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Host failures=" + hostFailures.keySet());
+      }
       // Shuffle knows how to deal with failures post shutdown via the onFailure hook
       exceptionReporter.reportException(new IOException(errorMsg));
+      return false;
     }
-
+    return true;
   }
 
   public synchronized void addKnownMapOutput(String inputHostName,
@@ -680,6 +929,7 @@ class ShuffleScheduler {
                                              String hostUrl,
                                              InputAttemptIdentifier srcAttempt) {
     String hostPort = (inputHostName + ":" + String.valueOf(port));
+    uniqueHosts.add(hostPort);
     String identifier = MapHost.createIdentifier(hostPort, partitionId);
 
 
@@ -697,7 +947,8 @@ class ShuffleScheduler {
 
     host.addKnownMap(srcAttempt);
     pathToIdentifierMap.put(
-        getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
+        getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(),
+            partitionId), srcAttempt);
 
     // Mark the host as pending
     if (host.getState() == MapHost.State.PENDING) {
@@ -709,8 +960,8 @@ class ShuffleScheduler {
   public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
     // The incoming srcAttempt does not contain a path component.
     LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt);
-    if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
-      //Pipelined shuffle case (where shuffleInfoEventsMap gets populated).
+    if (pipelinedShuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
+      //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated).
       //Fail fast here.
       exceptionReporter.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it "
           + "exists in shuffleInfoEventMap. Some data could have been already merged "
@@ -935,12 +1186,7 @@ class ShuffleScheduler {
     }
   }
   
-  public synchronized void informMaxMapRunTime(int duration) {
-    if (duration > maxMapRuntime) {
-      maxMapRuntime = duration;
-    }
-  }
-  
+
   void setInputFinished(int inputIndex) {
     synchronized(finishedMaps) {
       finishedMaps.set(inputIndex, true);

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 880dc2f..39cc471 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -355,6 +355,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+    confKeys.add(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
+    confKeys.add(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
+    confKeys.add(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
+    confKeys.add(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
+    confKeys.add(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
+    confKeys.add(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
+    confKeys.add(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS);

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index ceb0266..faa2d31 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -278,7 +278,7 @@ public class TestFetcher {
     }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId()));
 
     doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
-        anyLong(), anyLong(), anyLong(), any(MapOutput.class));
+        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
     doNothing().when(scheduler).putBackKnownMapOutput(host,
         srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
     doNothing().when(scheduler).putBackKnownMapOutput(host,
@@ -290,8 +290,8 @@ public class TestFetcher {
     for (int i : sucessfulAttemptsIndexes) {
       verifyCopySucceeded(scheduler, host, srcAttempts, i);
     }
-    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false);
-    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false);
+    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true);
+    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true);
 
     verify(metrics, times(3)).successFetch();
     verify(metrics, times(2)).failedFetch();
@@ -309,7 +309,7 @@ public class TestFetcher {
     String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
     ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
     verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),
-        eq(p * 1000), anyLong(), captureMapOutput.capture());
+        eq(p * 1000), anyLong(), captureMapOutput.capture(), anyBoolean());
 
     // cannot use the equals of MapOutput as it compares id which is private. so doing it manually
     MapOutput m = captureMapOutput.getAllValues().get(0);
@@ -414,7 +414,7 @@ public class TestFetcher {
     verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class));
     //since copyMapOutput consistently fails, it should call copyFailed once
     verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class),
-          anyBoolean(), anyBoolean());
+          anyBoolean(), anyBoolean(), anyBoolean());
 
     verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class));
     verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class),

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 78d214c..88a1d20 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -171,7 +171,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     int partitionId = attemptNum;
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1));
-    verify(scheduler).shuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
+    verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
 
     //Send final_update event.
     Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1);
@@ -181,9 +181,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(Collections.singletonList(dme2));
     baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     partitionId = attemptNum;
-    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
+    assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2));
-    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
+    assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
 
     MapHost host = scheduler.getHost();
     assertTrue(host != null);
@@ -191,10 +191,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     assertTrue(!list.isEmpty());
     //Let the final_update event pass
     MapOutput output = MapOutput.createMemoryMapOutput(id2, mergeManager, 1000, true);
-    scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output);
+    scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output, false);
     assertTrue(!scheduler.isDone()); //we haven't downloaded id1 yet
     output = MapOutput.createMemoryMapOutput(id1, mergeManager, 1000, true);
-    scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output);
+    scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output, false);
     assertTrue(!scheduler.isDone()); //we haven't downloaded another source yet
 
     //Send events for source 2
@@ -210,11 +210,11 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
         attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
     assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex()));
-    scheduler.copySucceeded(id4, null, 0, 0, 0, null);
+    scheduler.copySucceeded(id4, null, 0, 0, 0, null, false);
     assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet
     //Let the incremental event pass
     output = MapOutput.createMemoryMapOutput(id3, mergeManager, 1000, true);
-    scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output);
+    scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output, false);
     assertTrue(scheduler.isDone());
   }
 
@@ -234,7 +234,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
 
     verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1));
     assertTrue("Shuffle info events should not be empty for pipelined shuffle",
-        !scheduler.shuffleInfoEventsMap.isEmpty());
+        !scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
 
     //Attempt #0 comes up. When processing this, it should report exception
     attemptNum = 0;
@@ -263,7 +263,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
         eq(baseUri), eq(expectedIdentifier));
     assertTrue("Shuffle info events should be empty for regular shuffle codepath",
-        scheduler.shuffleInfoEventsMap.isEmpty());
+        scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
   }
 
   @Test(timeout = 5000)
@@ -288,7 +288,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(events);
     InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
     verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
-        eq(0l), eq(0l), any(MapOutput.class));
+        eq(0l), eq(0l), any(MapOutput.class), eq(true));
   }
 
   @Test(timeout = 5000)
@@ -302,7 +302,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(events);
     InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
     verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
-        eq(0l), eq(0l), any(MapOutput.class));
+        eq(0l), eq(0l), any(MapOutput.class), eq(true));
   }
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index ac6c6c0..3fe540c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -14,10 +14,16 @@
 
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.junit.Assert.assertEquals;
 
@@ -143,7 +149,7 @@ public class TestShuffleScheduler {
         MapOutput mapOutput = MapOutput
             .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
                 100, false);
-        scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput);
+        scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
         scheduler.freeHost(mapHosts[i]);
       }
 
@@ -155,6 +161,617 @@ public class TestShuffleScheduler {
     }
   }
 
+  @Test(timeout = 60000)
+  /**
+   * Scenario
+   *    - reducer has not progressed enough
+   *    - reducer becomes unhealthy after some failures
+   *    - no of attempts failing exceeds maxFailedUniqueFetches (5)
+   * Expected result
+   *    - fail the reducer
+   */
+  public void testReducerHealth_1() throws IOException {
+    Configuration conf = new TezConfiguration();
+    _testReducerHealth_1(conf);
+    conf.setInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 4000);
+    _testReducerHealth_1(conf);
+  }
+
+  public void _testReducerHealth_1(Configuration conf) throws IOException {
+    long startTime = System.currentTimeMillis() - 500000;
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320,
+        shuffle, conf);
+
+    int totalProducerNodes = 20;
+
+    //Generate 320 events
+    for (int i = 0; i < 320; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+          10000, i, "hostUrl", inputAttemptIdentifier);
+    }
+
+    //100 succeeds
+    for (int i = 0; i < 100; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+    //99 fails
+    for (int i = 100; i < 199; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+    }
+
+
+    InputAttemptIdentifier inputAttemptIdentifier =
+        new InputAttemptIdentifier(new InputIdentifier(200), 0, "attempt_");
+
+    //Should fail here and report exception as reducer is not healthy
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    int minFailurePerHost = conf.getInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT);
+
+    if (minFailurePerHost <= 4) {
+      //As per test threshold. Should fail & retrigger shuffle
+      verify(shuffle, atLeast(0)).reportException(any(Throwable.class));
+    } else if (minFailurePerHost > 100) {
+      //host failure is so high that this would not retrigger shuffle re-execution
+      verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+    }
+  }
+
+  @Test(timeout = 60000)
+  /**
+   * Scenario
+   *    - reducer has progressed enough
+   *    - failures start happening after that
+   *    - no of attempts failing exceeds maxFailedUniqueFetches (5)
+   *    - Has not stalled
+   * Expected result
+   *    - Since reducer is not stalled, it should continue without error
+   *
+   * When reducer stalls, wait until enough retries are done and throw exception
+   *
+   */
+  public void testReducerHealth_2() throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis() - 500000;
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+    int totalProducerNodes = 20;
+
+    //Generate 0-200 events
+    for (int i = 0; i < 200; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+          10000, i, "hostUrl", inputAttemptIdentifier);
+    }
+    assertEquals(320, scheduler.remainingMaps.get());
+
+    //Generate 200-320 events with empty partitions
+    for (int i = 200; i < 320; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copySucceeded(inputAttemptIdentifier, null, 0, 0, 0, null, true);
+    }
+    //120 are successful. so remaining is 200
+    assertEquals(200, scheduler.remainingMaps.get());
+
+
+    //200 pending to be downloaded. Download 190.
+    for (int i = 0; i < 190; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+    assertEquals(10, scheduler.remainingMaps.get());
+
+    //10 fails
+    for (int i = 190; i < 200; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+    }
+
+    //Shuffle has not stalled. so no issues.
+    verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
+
+    //stall shuffle
+    scheduler.lastProgressTime = System.currentTimeMillis() - 250000;
+
+    InputAttemptIdentifier inputAttemptIdentifier =
+        new InputAttemptIdentifier(new InputIdentifier(190), 0, "attempt_");
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" +
+        (190 % totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
+    verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
+
+    assertEquals(11, scheduler.failedShufflesSinceLastCompletion);
+
+    //fail to download 50 more times across attempts
+    for (int i = 190; i < 200; i++) {
+      inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+    }
+
+    assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
+    assertEquals(10, scheduler.remainingMaps.get());
+
+    verify(shuffle, atLeast(0)).reportException(any(Throwable.class));
+
+    //fail another 30
+    for (int i = 110; i < 120; i++) {
+      inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+    }
+
+    // Should fail now due to fetcherHealthy. (stall has already happened and
+    // these are the only pending tasks)
+    verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+  }
+
+
+
+  @Test(timeout = 60000)
+  /**
+   * Scenario
+   *    - reducer has progressed enough
+   *    - failures start happening after that in last fetch
+   *    - no of attempts failing does not exceed maxFailedUniqueFetches (5)
+   *    - Stalled
+   * Expected result
+   *    - Since reducer is stalled and if failures haven't happened across nodes,
+   *    it should be fine to proceed. AM would restart source task eventually.
+   *
+   */
+  public void testReducerHealth_3() throws IOException {
+    long startTime = System.currentTimeMillis() - 500000;
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+    int totalProducerNodes = 20;
+
+    //Generate 320 events
+    for (int i = 0; i < 320; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+          10000, i, "hostUrl", inputAttemptIdentifier);
+    }
+
+    //319 succeeds
+    for (int i = 0; i < 319; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+    //1 fails (last fetch)
+    InputAttemptIdentifier inputAttemptIdentifier =
+        new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    //stall the shuffle
+    scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
+
+    assertEquals(scheduler.remainingMaps.get(), 1);
+
+    //Retry for 3 more times
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    // failedShufflesSinceLastCompletion has crossed the limits. Throw error
+    verify(shuffle, times(0)).reportException(any(Throwable.class));
+  }
+
+  @Test(timeout = 60000)
+  /**
+   * Scenario
+   *    - reducer has progressed enough
+   *    - failures have happened randomly in nodes, but tasks are completed
+   *    - failures start happening after that in last fetch
+   *    - no of attempts failing does not exceed maxFailedUniqueFetches (5)
+   *    - Stalled
+   * Expected result
+   *    - reducer is stalled. But since errors are not seen across multiple
+   *    nodes, it is left to the AM to retart producer. Do not kill consumer.
+   *
+   */
+  public void testReducerHealth_4() throws IOException {
+    long startTime = System.currentTimeMillis() - 500000;
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+    int totalProducerNodes = 20;
+
+    //Generate 320 events
+    for (int i = 0; i < 320; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+          10000, i, "hostUrl", inputAttemptIdentifier);
+    }
+
+    //Tasks fail in 20% of nodes 3 times, but are able to proceed further
+    for (int i = 0; i < 64; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
+          totalProducerNodes) + ":" + 10000, ""), false, true, false);
+
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
+          totalProducerNodes) + ":" + 10000, ""), false, true, false);
+
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
+          totalProducerNodes) + ":" + 10000, ""), false, true, false);
+
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+      //319 succeeds
+    for (int i = 64; i < 319; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+    //1 fails (last fetch)
+    InputAttemptIdentifier inputAttemptIdentifier =
+        new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    //stall the shuffle (but within limits)
+    scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
+
+    assertEquals(scheduler.remainingMaps.get(), 1);
+
+    //Retry for 3 more times
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
+    // well. However, it has failed only in one host. So this should proceed
+    // until AM decides to restart the producer.
+    verify(shuffle, times(0)).reportException(any(Throwable.class));
+
+    //stall the shuffle (but within limits)
+    scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+    verify(shuffle, times(1)).reportException(any(Throwable.class));
+
+  }
+
+  @Test(timeout = 60000)
+  /**
+   * Scenario
+   *    - Shuffle has progressed enough
+   *    - Last event is yet to arrive
+   *    - Failures start happening after Shuffle has progressed enough
+   *    - no of attempts failing does not exceed maxFailedUniqueFetches (5)
+   *    - Stalled
+   * Expected result
+   *    - Do not throw errors, as Shuffle is yet to receive inputs
+   *
+   */
+  public void testReducerHealth_5() throws IOException {
+    long startTime = System.currentTimeMillis() - 500000;
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+    int totalProducerNodes = 20;
+
+    //Generate 319 events (last event has not arrived)
+    for (int i = 0; i < 319; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+          10000, i, "hostUrl", inputAttemptIdentifier);
+    }
+
+    //318 succeeds
+    for (int i = 0; i < 319; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+    //1 fails (last fetch)
+    InputAttemptIdentifier inputAttemptIdentifier =
+        new InputAttemptIdentifier(new InputIdentifier(318), 0, "attempt_");
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    //stall the shuffle
+    scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
+
+    assertEquals(scheduler.remainingMaps.get(), 1);
+
+    //Retry for 3 more times
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+    scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
+        totalProducerNodes)
+        + ":" + 10000, ""), false, true, false);
+
+    //Shuffle has not received the events completely. So do not bail out yet.
+    verify(shuffle, times(0)).reportException(any(Throwable.class));
+  }
+
+
+  @Test(timeout = 60000)
+  /**
+   * Scenario
+   *    - Shuffle has NOT progressed enough
+   *    - Failures start happening
+   *    - no of attempts failing exceed maxFailedUniqueFetches (5)
+   *    - Not stalled
+   * Expected result
+   *    - Bail out
+   *
+   */
+  public void testReducerHealth_6() throws IOException {
+    Configuration conf = new TezConfiguration();
+    conf.setBoolean
+        (TezRuntimeConfiguration
+                .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true);
+    _testReducerHealth_6(conf);
+
+    conf.setBoolean
+        (TezRuntimeConfiguration
+            .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false);
+    _testReducerHealth_6(conf);
+
+  }
+
+  public void _testReducerHealth_6(Configuration conf) throws IOException {
+    long startTime = System.currentTimeMillis() - 500000;
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320,
+        shuffle, conf);
+
+    int totalProducerNodes = 20;
+
+    //Generate 320 events (last event has not arrived)
+    for (int i = 0; i < 320; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+          10000, i, "hostUrl", inputAttemptIdentifier);
+    }
+
+    //10 succeeds
+    for (int i = 0; i < 10; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+    //5 fetches fail once
+    for (int i = 10; i < 15; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+    }
+
+    assertTrue(scheduler.failureCounts.size() >= 5);
+    assertEquals(scheduler.remainingMaps.get(), 310);
+
+    //Do not bail out (number of failures is just 5)
+    verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
+
+    //5 fetches fail repeatedly
+    for (int i = 10; i < 15; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+          + ":" + 10000, ""), false, true, false);
+    }
+
+    boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
+        (TezRuntimeConfiguration
+                .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION,
+            TezRuntimeConfiguration
+                .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
+    if (checkFailedFetchSinceLastCompletion) {
+      // Now bail out, as Shuffle has crossed the
+      // failedShufflesSinceLastCompletion limits. (even
+      // though reducerHeathly is
+      verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+    } else {
+      //Do not bail out yet.
+      verify(shuffle, atLeast(0)).reportException(any(Throwable.class));
+    }
+
+  }
+
+  @Test(timeout = 60000)
+  /**
+   * Scenario
+   *    - reducer has not progressed enough
+   *    - fetch fails >
+   *    TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION
+   * Expected result
+   *    - fail the reducer
+   */
+  public void testReducerHealth_7() throws IOException {
+    long startTime = System.currentTimeMillis() - 500000;
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+    int totalProducerNodes = 20;
+
+    //Generate 320 events
+    for (int i = 0; i < 320; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
+          "hostUrl", inputAttemptIdentifier);
+    }
+
+    //100 succeeds
+    for (int i = 0; i < 100; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      MapOutput mapOutput = MapOutput
+          .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+              100, false);
+      scheduler.copySucceeded(inputAttemptIdentifier,
+          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          100, 200, startTime + (i * 100), mapOutput, false);
+    }
+
+    //99 fails
+    for (int i = 100; i < 199; i++) {
+      InputAttemptIdentifier inputAttemptIdentifier =
+          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+      scheduler.copyFailed(inputAttemptIdentifier,
+          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier,
+          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier,
+          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          false, true, false);
+      scheduler.copyFailed(inputAttemptIdentifier,
+          new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+          false, true, false);
+    }
+
+    verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+  }
+
+  private ShuffleSchedulerForTest createScheduler(long startTime, int
+      numInputs, Shuffle shuffle, Configuration conf)
+      throws IOException {
+    InputContext inputContext = createTezInputContext();
+    MergeManager mergeManager = mock(MergeManager.class);
+
+    final ShuffleSchedulerForTest scheduler =
+        new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+            mergeManager,startTime, null, false, 0, "srcName");
+    return scheduler;
+  }
+
+  private ShuffleSchedulerForTest createScheduler(long startTime, int numInputs, Shuffle shuffle)
+      throws IOException {
+    return createScheduler(startTime, numInputs, shuffle, new
+        TezConfiguration());
+  }
+
+  @Test(timeout = 60000)
+  public void testPenalty() throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    Shuffle shuffle = mock(Shuffle.class);
+    final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle);
+
+    InputAttemptIdentifier inputAttemptIdentifier =
+        new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt_");
+    scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier);
+
+    assertTrue(scheduler.pendingHosts.size() == 1);
+    assertTrue(scheduler.pendingHosts.iterator().next().getState() == MapHost.State.PENDING);
+    MapHost mapHost = scheduler.pendingHosts.iterator().next();
+
+    //Fails to pull from host0. host0 should be added to penalties
+    scheduler.copyFailed(inputAttemptIdentifier, mapHost, false, true, false);
+
+    //Should not get host, as it is added to penalty loop
+    MapHost host = scheduler.getHost();
+    assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+
+    //Refree thread would release it after INITIAL_PENALTY timeout
+    Thread.sleep(ShuffleScheduler.INITIAL_PENALTY + 1000);
+    host = scheduler.getHost();
+    assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+  }
+
   @Test(timeout = 5000)
   public void testShutdown() throws Exception {
     InputContext inputContext = createTezInputContext();
@@ -198,7 +815,7 @@ public class TestShuffleScheduler {
         MapOutput mapOutput = MapOutput
             .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
                 100, false);
-        scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput);
+        scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
         scheduler.freeHost(mapHosts[i]);
       }
 
@@ -234,6 +851,7 @@ public class TestShuffleScheduler {
 
     private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
     private final boolean fetcherShouldWait;
+    private final ExceptionReporter reporter;
 
     public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf,
                                    int numberOfInputs,
@@ -258,6 +876,7 @@ public class TestShuffleScheduler {
       super(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec,
           ifileReadAhead, ifileReadAheadLength, srcNameTrimmed);
       this.fetcherShouldWait = fetcherShouldWait;
+      this.reporter = shuffle;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
index 674d4b4..49b5490 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
@@ -58,6 +58,23 @@ public class TestOrderedGroupedMergedKVInputConfig {
     fromConf.set("test.conf.key.1", "confkey1");
     fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
     fromConf.set("io.shouldExist", "io");
+    fromConf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 3);
+    fromConf.setFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION,
+        0.1f);
+    fromConf.setFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION,
+        0.6f);
+    fromConf.setInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT,
+        10);
+    fromConf.setFloat(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.2f);
+    fromConf.setFloat(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f);
+    fromConf.setBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false);
     Map<String, String> additionalConf = new HashMap<String, String>();
     additionalConf.put("test.key.2", "key2");
     additionalConf.put(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, "3");
@@ -96,6 +113,19 @@ public class TestOrderedGroupedMergedKVInputConfig {
     assertEquals(0.22f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), 0.001f);
     assertEquals(0.33f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.0f), 0.001f);
     assertEquals(0.44f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.00f), 0.001f);
+    assertEquals(0.1f, conf.getFloat(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, 0.00f), 0.001f);
+    assertEquals(3, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 0), 0);
+    assertEquals(10, conf.getInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, 0), 0);
+    assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, 0.00f), 0.001f);
+    assertEquals(0.2f, conf.getFloat(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.00f), 0.001f);
+    assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f), 0.001f);
+    assertEquals(false, conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true));
 
     // Verify additional configs
     assertEquals(false, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,