You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/03/06 03:11:15 UTC

git commit: TEZ-902. SortedMergedInput Fetcher can hang on retrying a bad input (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 66b374e58 -> 36e74ea06


TEZ-902. SortedMergedInput Fetcher can hang on retrying a bad input (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/36e74ea0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/36e74ea0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/36e74ea0

Branch: refs/heads/master
Commit: 36e74ea06ddb9831932281554fbdab8b290ced19
Parents: 66b374e
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Mar 5 18:11:06 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Mar 5 18:11:06 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |  2 +-
 .../input/BroadcastShuffleManager.java          |  4 +-
 .../library/common/InputAttemptIdentifier.java  | 12 ++-
 .../runtime/library/common/InputIdentifier.java | 16 ++--
 .../library/common/shuffle/impl/Fetcher.java    | 15 +++-
 .../library/common/shuffle/impl/MapOutput.java  |  2 +-
 .../common/shuffle/impl/MergeManager.java       |  4 +-
 .../shuffle/impl/ShuffleInputEventHandler.java  |  5 +-
 .../common/shuffle/impl/ShuffleScheduler.java   | 80 +++++++++++++-------
 .../local/output/TezLocalTaskOutputFiles.java   |  2 +-
 .../shuffle/common/DiskFetchedInput.java        |  2 +-
 .../library/common/TestInputIdentifiers.java    | 44 +++++++++++
 12 files changed, 140 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 8403f09..9a23646 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -246,7 +246,7 @@ public class AMNodeImpl implements AMNode {
 
   protected void blacklistSelf() {
     for (ContainerId c : containers) {
-      sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+      sendEvent(new AMContainerEventNodeFailed(c, "Node blacklisted"));
     }
     // these containers are not useful anymore
     pastContainers.addAll(containers);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 54f80e4..776f186 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -515,9 +515,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
         "Fetch failure while fetching from "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
                 inputContext.getSourceVertexName(),
-                srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+                srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
                 srcAttemptIdentifier.getAttemptNumber()),
-        srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+        srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
         srcAttemptIdentifier.getAttemptNumber());
     
     List<Event> failedEvents = Lists.newArrayListWithCapacity(1);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index a13f3f1..946cb0f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.library.common;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezUncheckedException;
 
 /**
  * Container for a task number and an attempt number for the task.
@@ -30,14 +31,20 @@ public class InputAttemptIdentifier {
   private final int attemptNumber;
   private String pathComponent;
   
-  public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
-    this(new InputIdentifier(taskIndex), attemptNumber, null);
+  public static String PATH_PREFIX = "attempt";
+  
+  public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
+    this(new InputIdentifier(inputIndex), attemptNumber, null);
   }
   
   public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
     this.inputIdentifier = inputIdentifier;
     this.attemptNumber = attemptNumber;
     this.pathComponent = pathComponent;
+    if (pathComponent != null && !pathComponent.startsWith(PATH_PREFIX)) {
+      throw new TezUncheckedException(
+          "Path component must start with: " + PATH_PREFIX + this);
+    }
   }
   
   public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
@@ -83,6 +90,7 @@ public class InputAttemptIdentifier {
         return false;
     } else if (!inputIdentifier.equals(other.inputIdentifier))
       return false;
+    // do not compare pathComponent as they may not always be present
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
index f4ce190..2513325 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
@@ -20,19 +20,19 @@ package org.apache.tez.runtime.library.common;
 
 public class InputIdentifier {
 
-  private final int srcTaskIndex;
+  private final int inputIndex;
   
-  public InputIdentifier(int srcTaskIndex) {
-    this.srcTaskIndex = srcTaskIndex;
+  public InputIdentifier(int srcInputIndex) {
+    this.inputIndex = srcInputIndex;
   }
 
-  public int getSrcTaskIndex() {
-    return this.srcTaskIndex;
+  public int getInputIndex() {
+    return this.inputIndex;
   }
 
   @Override
   public int hashCode() {
-    return srcTaskIndex;
+    return inputIndex;
   }
 
   @Override
@@ -44,13 +44,13 @@ public class InputIdentifier {
     if (getClass() != obj.getClass())
       return false;
     InputIdentifier other = (InputIdentifier) obj;
-    if (srcTaskIndex != other.srcTaskIndex)
+    if (inputIndex != other.inputIndex)
       return false;
     return true;
   }
 
   @Override
   public String toString() {
-    return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
+    return "InputIdentifier [inputIndex=" + inputIndex + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 042e8f5..4fd9b53 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -303,12 +303,14 @@ class Fetcher extends Thread {
         for(InputAttemptIdentifier left: remaining) {
           scheduler.copyFailed(left, host, connectSucceeded);
         }
+        remaining.clear();
       } else {
         // If we got a read error at this stage, it implies there was a problem
         // with the first map, typically lost map. So, penalize only that map
         // and add the rest
         InputAttemptIdentifier firstMap = srcAttempts.get(0);
         scheduler.copyFailed(firstMap, host, connectSucceeded);
+        remaining.remove(firstMap);
       }
       
       // Add back all the remaining maps, WITHOUT marking them as failed
@@ -327,6 +329,9 @@ class Fetcher extends Thread {
       // yet_to_be_fetched list and marking the failed tasks.
       InputAttemptIdentifier[] failedTasks = null;
       while (!remaining.isEmpty() && failedTasks == null) {
+        // fail immediately after first failure because we dont know how much to 
+        // skip for this error in the input stream. So we cannot move on to the 
+        // remaining outputs. YARN-1773. Will get to them in the next retry.
         failedTasks = copyMapOutput(host, input);
       }
       
@@ -334,6 +339,7 @@ class Fetcher extends Thread {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
         for(InputAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true);
+          remaining.remove(left);
         }
       }
       
@@ -367,6 +373,10 @@ class Fetcher extends Thread {
       try {
         ShuffleHeader header = new ShuffleHeader();
         header.readFields(input);
+        if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
+          throw new IllegalArgumentException(
+              "Invalid header received: " + header.mapId + " partition: " + header.forReduce);
+        }
         srcAttemptId = 
             scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
         compressedLength = header.compressedLength;
@@ -375,8 +385,9 @@ class Fetcher extends Thread {
       } catch (IllegalArgumentException e) {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
-        //Don't know which one was bad, so consider all of them as bad
-        return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+        // Don't know which one was bad, so consider this one bad and dont read
+        // the remaining because we dont know where to start reading from. YARN-1773
+        return new InputAttemptIdentifier[] {srcAttemptId = getNextRemainingAttempt()};
       }
 
  

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
index 9f673a0..ef741ae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
@@ -80,7 +80,7 @@ class MapOutput {
     
     this.localFS = FileSystem.getLocal(conf);
     outputPath =
-      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
+      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getInputIndex(), size);
     tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
 
     disk = localFS.create(tmpOutputPath);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 051806c..2fb6b08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -550,7 +550,7 @@ public class MergeManager {
       int noInMemorySegments = inMemorySegments.size();
 
       Path outputPath = mapOutputFile.getInputFileForWrite(
-          srcTaskIdentifier.getInputIdentifier().getSrcTaskIndex(),
+          srcTaskIdentifier.getInputIdentifier().getInputIndex(),
           mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
 
       Writer writer = null;
@@ -761,7 +761,7 @@ public class MergeManager {
     long inMemToDiskBytes = 0;
     boolean mergePhaseFinished = false;
     if (inMemoryMapOutputs.size() > 0) {
-      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
+      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
       inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
                                                 memDiskSegments,
                                                 this.postMergeMemLimit);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 2dcabe1..2676a19 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -72,7 +72,8 @@ public class ShuffleInputEventHandler {
     } 
     int partitionId = dmEvent.getSourceIndex();
     URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+    InputAttemptIdentifier srcAttemptIdentifier = 
+        new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
     LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier);
     scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
     
@@ -86,7 +87,7 @@ public class ShuffleInputEventHandler {
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
     InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
-    scheduler.obsoleteMapOutput(taIdentifier);
+    scheduler.obsoleteInput(taIdentifier);
     LOG.info("Obsoleting output of src-task: " + taIdentifier);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 3ff7d6b..9106f95 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -67,7 +68,7 @@ class ShuffleScheduler {
   //TODO NEWTEZ Clean this and other maps at some point
   private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); 
   private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<InputAttemptIdentifier> obsoleteMaps = new HashSet<InputAttemptIdentifier>();
+  private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
   
   private final Random random = new Random(System.currentTimeMillis());
   private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
@@ -132,13 +133,13 @@ class ShuffleScheduler {
                                          long milis,
                                          MapOutput output
                                          ) throws IOException {
-    String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
+    String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber());
     failureCounts.remove(taskIdentifier);
     hostFailures.remove(host.getHostName());
     
-    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
+    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
       output.commit();
-      setInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex());
+      setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
       shuffledMapsCounter.increment(1);
       if (--remainingMaps == 0) {
         notifyAll();
@@ -152,7 +153,7 @@ class ShuffleScheduler {
       if (LOG.isDebugEnabled()) {
         LOG.debug("src task: "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
                 srcAttemptIdentifier.getAttemptNumber()) + " done");
       }
     }
@@ -192,13 +193,13 @@ class ShuffleScheduler {
       IOException ioe = new IOException(failures
             + " failures downloading "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
+                inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
                 srcAttempt.getAttemptNumber()));
       ioe.fillInStackTrace();
       shuffle.reportException(ioe);
     }
     
-    checkAndInformJobTracker(failures, srcAttempt, readError);
+    checkAndInformAM(failures, srcAttempt, readError);
 
     checkReducerHealth();
     
@@ -213,21 +214,21 @@ class ShuffleScheduler {
   // Notify the JobTracker  
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
-  private void checkAndInformJobTracker(
+  private void checkAndInformAM(
       int failures, InputAttemptIdentifier srcAttempt, boolean readError) {
     if ((reportReadErrorImmediately && readError)
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for "
+      LOG.info("Reporting fetch failure for InputIdentifier: " 
+          + srcAttempt + " taskAttemptIdentifier: "
           + TezRuntimeUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
-              srcAttempt.getAttemptNumber()) + " to jobtracker.");
-
+              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
+              srcAttempt.getAttemptNumber()) + " to AM.");
       List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
       failedEvents.add(new InputReadErrorEvent("Fetch failure for "
           + TezRuntimeUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
+              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
               srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
-          .getSrcTaskIndex(), srcAttempt.getAttemptNumber()));
+          .getInputIndex(), srcAttempt.getAttemptNumber()));
 
       inputContext.sendEvents(failedEvents);      
       //status.addFailedDependency(mapId);
@@ -305,9 +306,10 @@ class ShuffleScheduler {
     }
   }
   
-  public synchronized void obsoleteMapOutput(InputAttemptIdentifier srcAttempt) {
+  public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
     // The incoming srcAttempt does not contain a path component.
-    obsoleteMaps.add(srcAttempt);
+    LOG.info("Adding obsolete input: " + srcAttempt);
+    obsoleteInputs.add(srcAttempt);
   }
   
   public synchronized void putBackKnownMapOutput(MapHost host,
@@ -342,27 +344,53 @@ class ShuffleScheduler {
     return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
   }
   
+  private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
+    return (!obsoleteInputs.contains(id) && 
+             !isInputFinished(id.getInputIdentifier().getInputIndex()));
+  }
+  
   public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
-    List<InputAttemptIdentifier> list = host.getAndClearKnownMaps();
-    Iterator<InputAttemptIdentifier> itr = list.iterator();
+    List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
+    Map<Integer, InputAttemptIdentifier> dedupedList = new LinkedHashMap<Integer, InputAttemptIdentifier>();
+    Iterator<InputAttemptIdentifier> listItr = origList.iterator();
+    while (listItr.hasNext()) {
+      // we may want to try all versions of the input but with current retry 
+      // behavior older ones are likely to be lost and should be ignored.
+      // This may be removed after TEZ-914
+      InputAttemptIdentifier id = listItr.next();
+      Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
+      InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
+      if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
+        dedupedList.put(inputNumber, id);
+        if (oldId != null) {
+          LOG.warn("Ignoring older source: " + oldId + 
+              " in favor of newer source: " + id);
+        }
+      }
+    }
     List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
     int includedMaps = 0;
-    int totalSize = list.size();
+    int totalSize = dedupedList.size();
+    Iterator<Map.Entry<Integer, InputAttemptIdentifier>> dedupedItr = dedupedList.entrySet().iterator();
     // find the maps that we still need, up to the limit
-    while (itr.hasNext()) {
-      InputAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
+    while (dedupedItr.hasNext()) {
+      InputAttemptIdentifier id = dedupedItr.next().getValue();
+      if (inputShouldBeConsumed(id)) {
         result.add(id);
         if (++includedMaps >= MAX_MAPS_AT_ONCE) {
           break;
         }
+      } else {
+        LOG.info("Ignoring finished or obsolete source: " + id);
       }
     }
     // put back the maps left after the limit
-    while (itr.hasNext()) {
-      InputAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
+    while (dedupedItr.hasNext()) {
+      InputAttemptIdentifier id = dedupedItr.next().getValue();
+      if (inputShouldBeConsumed(id)) {
         host.addKnownMap(id);
+      } else {
+        LOG.info("Ignoring finished or obsolete source: " + id);
       }
     }
     LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
@@ -383,7 +411,7 @@ class ShuffleScheduler {
     
   public synchronized void resetKnownMaps() {
     mapLocations.clear();
-    obsoleteMaps.clear();
+    obsoleteInputs.clear();
     pendingHosts.clear();
     pathToIdentifierMap.clear();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
index 7b61b98..e863f47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -207,7 +207,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
       throws IOException {
     return lDirAlloc.getLocalPathToRead(String.format(
         Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, 
-        Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getInputIndex())), conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 06b9314..1d26c6e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -50,7 +50,7 @@ public class DiskFetchedInput extends FetchedInput {
 
     this.localFS = FileSystem.getLocal(conf);
     this.outputPath = filenameAllocator.getInputFileForWrite(
-        this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), actualSize);
+        this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), actualSize);
     // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
     // otherwise fetches for the same task but from different attempts would clobber each other.
     this.tmpOutputPath = outputPath.suffix(String.valueOf(id));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
new file mode 100644
index 0000000..b173054
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestInputIdentifiers {
+  
+  @Test
+  public void testInputAttemptIdentifier() {
+    Set<InputAttemptIdentifier> set = new HashSet<InputAttemptIdentifier>();
+    InputAttemptIdentifier i1 = new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX);
+    InputAttemptIdentifier i2 = new InputAttemptIdentifier(1, 1, null);
+    InputAttemptIdentifier i3 = new InputAttemptIdentifier(1, 0, null);
+    InputAttemptIdentifier i4 = new InputAttemptIdentifier(0, 1, null);
+    
+    Assert.assertTrue(set.add(i1));
+    Assert.assertFalse(set.add(i1));
+    Assert.assertFalse(set.add(i2));
+    Assert.assertTrue(set.add(i3));
+    Assert.assertTrue(set.add(i4));
+  }
+
+}