You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/03/06 03:11:15 UTC
git commit: TEZ-902. SortedMergedInput Fetcher can hang on retrying a
bad input (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 66b374e58 -> 36e74ea06
TEZ-902. SortedMergedInput Fetcher can hang on retrying a bad input (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/36e74ea0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/36e74ea0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/36e74ea0
Branch: refs/heads/master
Commit: 36e74ea06ddb9831932281554fbdab8b290ced19
Parents: 66b374e
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Mar 5 18:11:06 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Mar 5 18:11:06 2014 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 2 +-
.../input/BroadcastShuffleManager.java | 4 +-
.../library/common/InputAttemptIdentifier.java | 12 ++-
.../runtime/library/common/InputIdentifier.java | 16 ++--
.../library/common/shuffle/impl/Fetcher.java | 15 +++-
.../library/common/shuffle/impl/MapOutput.java | 2 +-
.../common/shuffle/impl/MergeManager.java | 4 +-
.../shuffle/impl/ShuffleInputEventHandler.java | 5 +-
.../common/shuffle/impl/ShuffleScheduler.java | 80 +++++++++++++-------
.../local/output/TezLocalTaskOutputFiles.java | 2 +-
.../shuffle/common/DiskFetchedInput.java | 2 +-
.../library/common/TestInputIdentifiers.java | 44 +++++++++++
12 files changed, 140 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 8403f09..9a23646 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -246,7 +246,7 @@ public class AMNodeImpl implements AMNode {
protected void blacklistSelf() {
for (ContainerId c : containers) {
- sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+ sendEvent(new AMContainerEventNodeFailed(c, "Node blacklisted"));
}
// these containers are not useful anymore
pastContainers.addAll(containers);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 54f80e4..776f186 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -515,9 +515,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
"Fetch failure while fetching from "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
inputContext.getSourceVertexName(),
- srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
srcAttemptIdentifier.getAttemptNumber()),
- srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
srcAttemptIdentifier.getAttemptNumber());
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index a13f3f1..946cb0f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -19,6 +19,7 @@
package org.apache.tez.runtime.library.common;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezUncheckedException;
/**
* Container for a task number and an attempt number for the task.
@@ -30,14 +31,20 @@ public class InputAttemptIdentifier {
private final int attemptNumber;
private String pathComponent;
- public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
- this(new InputIdentifier(taskIndex), attemptNumber, null);
+ public static String PATH_PREFIX = "attempt";
+
+ public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
+ this(new InputIdentifier(inputIndex), attemptNumber, null);
}
public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
this.inputIdentifier = inputIdentifier;
this.attemptNumber = attemptNumber;
this.pathComponent = pathComponent;
+ if (pathComponent != null && !pathComponent.startsWith(PATH_PREFIX)) {
+ throw new TezUncheckedException(
+ "Path component must start with: " + PATH_PREFIX + this);
+ }
}
public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
@@ -83,6 +90,7 @@ public class InputAttemptIdentifier {
return false;
} else if (!inputIdentifier.equals(other.inputIdentifier))
return false;
+ // do not compare pathComponent as they may not always be present
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
index f4ce190..2513325 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputIdentifier.java
@@ -20,19 +20,19 @@ package org.apache.tez.runtime.library.common;
public class InputIdentifier {
- private final int srcTaskIndex;
+ private final int inputIndex;
- public InputIdentifier(int srcTaskIndex) {
- this.srcTaskIndex = srcTaskIndex;
+ public InputIdentifier(int srcInputIndex) {
+ this.inputIndex = srcInputIndex;
}
- public int getSrcTaskIndex() {
- return this.srcTaskIndex;
+ public int getInputIndex() {
+ return this.inputIndex;
}
@Override
public int hashCode() {
- return srcTaskIndex;
+ return inputIndex;
}
@Override
@@ -44,13 +44,13 @@ public class InputIdentifier {
if (getClass() != obj.getClass())
return false;
InputIdentifier other = (InputIdentifier) obj;
- if (srcTaskIndex != other.srcTaskIndex)
+ if (inputIndex != other.inputIndex)
return false;
return true;
}
@Override
public String toString() {
- return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
+ return "InputIdentifier [inputIndex=" + inputIndex + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 042e8f5..4fd9b53 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -303,12 +303,14 @@ class Fetcher extends Thread {
for(InputAttemptIdentifier left: remaining) {
scheduler.copyFailed(left, host, connectSucceeded);
}
+ remaining.clear();
} else {
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
InputAttemptIdentifier firstMap = srcAttempts.get(0);
scheduler.copyFailed(firstMap, host, connectSucceeded);
+ remaining.remove(firstMap);
}
// Add back all the remaining maps, WITHOUT marking them as failed
@@ -327,6 +329,9 @@ class Fetcher extends Thread {
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
+ // fail immediately after first failure because we dont know how much to
+ // skip for this error in the input stream. So we cannot move on to the
+ // remaining outputs. YARN-1773. Will get to them in the next retry.
failedTasks = copyMapOutput(host, input);
}
@@ -334,6 +339,7 @@ class Fetcher extends Thread {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
for(InputAttemptIdentifier left: failedTasks) {
scheduler.copyFailed(left, host, true);
+ remaining.remove(left);
}
}
@@ -367,6 +373,10 @@ class Fetcher extends Thread {
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
+ if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
+ throw new IllegalArgumentException(
+ "Invalid header received: " + header.mapId + " partition: " + header.forReduce);
+ }
srcAttemptId =
scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
compressedLength = header.compressedLength;
@@ -375,8 +385,9 @@ class Fetcher extends Thread {
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
- //Don't know which one was bad, so consider all of them as bad
- return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+ // Don't know which one was bad, so consider this one bad and dont read
+ // the remaining because we dont know where to start reading from. YARN-1773
+ return new InputAttemptIdentifier[] {srcAttemptId = getNextRemainingAttempt()};
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
index 9f673a0..ef741ae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapOutput.java
@@ -80,7 +80,7 @@ class MapOutput {
this.localFS = FileSystem.getLocal(conf);
outputPath =
- mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
+ mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getInputIndex(), size);
tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
disk = localFS.create(tmpOutputPath);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 051806c..2fb6b08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -550,7 +550,7 @@ public class MergeManager {
int noInMemorySegments = inMemorySegments.size();
Path outputPath = mapOutputFile.getInputFileForWrite(
- srcTaskIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ srcTaskIdentifier.getInputIdentifier().getInputIndex(),
mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
Writer writer = null;
@@ -761,7 +761,7 @@ public class MergeManager {
long inMemToDiskBytes = 0;
boolean mergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
- int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
+ int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
memDiskSegments,
this.postMergeMemLimit);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 2dcabe1..2676a19 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -72,7 +72,8 @@ public class ShuffleInputEventHandler {
}
int partitionId = dmEvent.getSourceIndex();
URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+ InputAttemptIdentifier srcAttemptIdentifier =
+ new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier);
scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
@@ -86,7 +87,7 @@ public class ShuffleInputEventHandler {
private void processTaskFailedEvent(InputFailedEvent ifEvent) {
InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
- scheduler.obsoleteMapOutput(taIdentifier);
+ scheduler.obsoleteInput(taIdentifier);
LOG.info("Obsoleting output of src-task: " + taIdentifier);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 3ff7d6b..9106f95 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -67,7 +68,7 @@ class ShuffleScheduler {
//TODO NEWTEZ Clean this and other maps at some point
private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
private Set<MapHost> pendingHosts = new HashSet<MapHost>();
- private Set<InputAttemptIdentifier> obsoleteMaps = new HashSet<InputAttemptIdentifier>();
+ private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
private final Random random = new Random(System.currentTimeMillis());
private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
@@ -132,13 +133,13 @@ class ShuffleScheduler {
long milis,
MapOutput output
) throws IOException {
- String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
+ String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber());
failureCounts.remove(taskIdentifier);
hostFailures.remove(host.getHostName());
- if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
+ if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
output.commit();
- setInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex());
+ setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
shuffledMapsCounter.increment(1);
if (--remainingMaps == 0) {
notifyAll();
@@ -152,7 +153,7 @@ class ShuffleScheduler {
if (LOG.isDebugEnabled()) {
LOG.debug("src task: "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+ inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
srcAttemptIdentifier.getAttemptNumber()) + " done");
}
}
@@ -192,13 +193,13 @@ class ShuffleScheduler {
IOException ioe = new IOException(failures
+ " failures downloading "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
+ inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
srcAttempt.getAttemptNumber()));
ioe.fillInStackTrace();
shuffle.reportException(ioe);
}
- checkAndInformJobTracker(failures, srcAttempt, readError);
+ checkAndInformAM(failures, srcAttempt, readError);
checkReducerHealth();
@@ -213,21 +214,21 @@ class ShuffleScheduler {
// Notify the JobTracker
// after every read error, if 'reportReadErrorImmediately' is true or
// after every 'maxFetchFailuresBeforeReporting' failures
- private void checkAndInformJobTracker(
+ private void checkAndInformAM(
int failures, InputAttemptIdentifier srcAttempt, boolean readError) {
if ((reportReadErrorImmediately && readError)
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
- LOG.info("Reporting fetch failure for "
+ LOG.info("Reporting fetch failure for InputIdentifier: "
+ + srcAttempt + " taskAttemptIdentifier: "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
- srcAttempt.getAttemptNumber()) + " to jobtracker.");
-
+ inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getAttemptNumber()) + " to AM.");
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
failedEvents.add(new InputReadErrorEvent("Fetch failure for "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
+ inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
- .getSrcTaskIndex(), srcAttempt.getAttemptNumber()));
+ .getInputIndex(), srcAttempt.getAttemptNumber()));
inputContext.sendEvents(failedEvents);
//status.addFailedDependency(mapId);
@@ -305,9 +306,10 @@ class ShuffleScheduler {
}
}
- public synchronized void obsoleteMapOutput(InputAttemptIdentifier srcAttempt) {
+ public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
// The incoming srcAttempt does not contain a path component.
- obsoleteMaps.add(srcAttempt);
+ LOG.info("Adding obsolete input: " + srcAttempt);
+ obsoleteInputs.add(srcAttempt);
}
public synchronized void putBackKnownMapOutput(MapHost host,
@@ -342,27 +344,53 @@ class ShuffleScheduler {
return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
}
+ private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
+ return (!obsoleteInputs.contains(id) &&
+ !isInputFinished(id.getInputIdentifier().getInputIndex()));
+ }
+
public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
- List<InputAttemptIdentifier> list = host.getAndClearKnownMaps();
- Iterator<InputAttemptIdentifier> itr = list.iterator();
+ List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
+ Map<Integer, InputAttemptIdentifier> dedupedList = new LinkedHashMap<Integer, InputAttemptIdentifier>();
+ Iterator<InputAttemptIdentifier> listItr = origList.iterator();
+ while (listItr.hasNext()) {
+ // we may want to try all versions of the input but with current retry
+ // behavior older ones are likely to be lost and should be ignored.
+ // This may be removed after TEZ-914
+ InputAttemptIdentifier id = listItr.next();
+ Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
+ InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
+ if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
+ dedupedList.put(inputNumber, id);
+ if (oldId != null) {
+ LOG.warn("Ignoring older source: " + oldId +
+ " in favor of newer source: " + id);
+ }
+ }
+ }
List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
int includedMaps = 0;
- int totalSize = list.size();
+ int totalSize = dedupedList.size();
+ Iterator<Map.Entry<Integer, InputAttemptIdentifier>> dedupedItr = dedupedList.entrySet().iterator();
// find the maps that we still need, up to the limit
- while (itr.hasNext()) {
- InputAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
+ while (dedupedItr.hasNext()) {
+ InputAttemptIdentifier id = dedupedItr.next().getValue();
+ if (inputShouldBeConsumed(id)) {
result.add(id);
if (++includedMaps >= MAX_MAPS_AT_ONCE) {
break;
}
+ } else {
+ LOG.info("Ignoring finished or obsolete source: " + id);
}
}
// put back the maps left after the limit
- while (itr.hasNext()) {
- InputAttemptIdentifier id = itr.next();
- if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
+ while (dedupedItr.hasNext()) {
+ InputAttemptIdentifier id = dedupedItr.next().getValue();
+ if (inputShouldBeConsumed(id)) {
host.addKnownMap(id);
+ } else {
+ LOG.info("Ignoring finished or obsolete source: " + id);
}
}
LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
@@ -383,7 +411,7 @@ class ShuffleScheduler {
public synchronized void resetKnownMaps() {
mapLocations.clear();
- obsoleteMaps.clear();
+ obsoleteInputs.clear();
pendingHosts.clear();
pathToIdentifierMap.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
index 7b61b98..e863f47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -207,7 +207,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
throws IOException {
return lDirAlloc.getLocalPathToRead(String.format(
Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING,
- Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getInputIndex())), conf);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 06b9314..1d26c6e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -50,7 +50,7 @@ public class DiskFetchedInput extends FetchedInput {
this.localFS = FileSystem.getLocal(conf);
this.outputPath = filenameAllocator.getInputFileForWrite(
- this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), actualSize);
+ this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), actualSize);
// Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
// otherwise fetches for the same task but from different attempts would clobber each other.
this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e74ea0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
new file mode 100644
index 0000000..b173054
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestInputIdentifiers {
+
+ @Test
+ public void testInputAttemptIdentifier() {
+ Set<InputAttemptIdentifier> set = new HashSet<InputAttemptIdentifier>();
+ InputAttemptIdentifier i1 = new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX);
+ InputAttemptIdentifier i2 = new InputAttemptIdentifier(1, 1, null);
+ InputAttemptIdentifier i3 = new InputAttemptIdentifier(1, 0, null);
+ InputAttemptIdentifier i4 = new InputAttemptIdentifier(0, 1, null);
+
+ Assert.assertTrue(set.add(i1));
+ Assert.assertFalse(set.add(i1));
+ Assert.assertFalse(set.add(i2));
+ Assert.assertTrue(set.add(i3));
+ Assert.assertTrue(set.add(i4));
+ }
+
+}