You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/10 19:00:43 UTC
git commit: TEZ-918. Fix a bug which could cause shuffle to hang if
there are intermittent fetch failures. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 1b170a4ec -> 98b49f227
TEZ-918. Fix a bug which could cause shuffle to hang if there are intermittent fetch failures.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/98b49f22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/98b49f22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/98b49f22
Branch: refs/heads/master
Commit: 98b49f22790e5116e1fd48fa96b3b01c2359ffaf
Parents: 1b170a4
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 10 11:00:11 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 10 11:00:11 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 2 +-
.../library/common/shuffle/impl/Fetcher.java | 84 ++++++++++--------
.../library/common/shuffle/impl/Shuffle.java | 4 +
.../common/shuffle/impl/ShuffleScheduler.java | 89 ++++++++++++--------
4 files changed, 107 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index a3d6561..b34eaf2 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -194,7 +194,7 @@ public class TezJobConfig {
*/
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES =
"tez.runtime.shuffle.fetch.failures.limit";
- public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT = 10;
+ public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT = 5;
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 97b57f3..a81b83e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -76,6 +76,7 @@ class Fetcher extends Thread {
private final Shuffle shuffle;
private final int id;
private static int nextId = 0;
+ private int currentPartition = -1;
private final int connectionTimeout;
private final int readTimeout;
@@ -87,8 +88,6 @@ class Fetcher extends Thread {
private final SecretKey jobTokenSecret;
private volatile boolean stopped = false;
-
- private Configuration job;
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
@@ -102,7 +101,6 @@ class Fetcher extends Thread {
ShuffleScheduler scheduler, MergeManager merger,
ShuffleClientMetrics metrics,
Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException {
- this.job = job;
this.scheduler = scheduler;
this.merger = merger;
this.metrics = metrics;
@@ -228,6 +226,7 @@ class Fetcher extends Thread {
protected void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
+ currentPartition = host.getPartitionId();
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
@@ -237,7 +236,7 @@ class Fetcher extends Thread {
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
- + srcAttempts);
+ + srcAttempts + ", partitionId: " + currentPartition);
}
// List of maps to be fetched yet
@@ -294,31 +293,28 @@ class Fetcher extends Thread {
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
} catch (IOException ie) {
ioErrs.increment(1);
- LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
- " map outputs", ie);
-
- // If connect did not succeed, just mark all the maps as failed,
- // indirectly penalizing the host
if (!connectSucceeded) {
- for(InputAttemptIdentifier left: remaining) {
- scheduler.copyFailed(left, host, connectSucceeded);
- }
- remaining.clear();
+ LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
+ connectionErrs.increment(1);
} else {
- // If we got a read error at this stage, it implies there was a problem
- // with the first map, typically lost map. So, penalize only that map
- // and add the rest
- InputAttemptIdentifier firstMap = srcAttempts.get(0);
- scheduler.copyFailed(firstMap, host, connectSucceeded);
- remaining.remove(firstMap);
+ LOG.warn("Failed to verify reply after connecting to " + host + " with " + remaining.size()
+ + " inputs pending", ie);
}
+
+ // At this point, either the connection failed, or the initial header verification failed.
+ // The error does not relate to any specific Input. Report all of them as failed.
- // Add back all the remaining maps, WITHOUT marking them as failed
+ // This ends up indirectly penalizing the host (multiple failures reported on the single host)
+
for(InputAttemptIdentifier left: remaining) {
- // TODO Should the first one be skipped ?
- scheduler.putBackKnownMapOutput(host, left);
+ // Need to be handling temporary glitches ..
+ scheduler.copyFailed(left, host, !connectSucceeded);
}
-
+
+ // Add back all remaining maps - which at this point is ALL MAPS the
+ // Fetcher was started with. The Scheduler takes care of retries,
+ // reporting too many failures etc.
+ putBackRemainingMapOutputs(host);
return;
}
@@ -339,7 +335,6 @@ class Fetcher extends Thread {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
for(InputAttemptIdentifier left: failedTasks) {
scheduler.copyFailed(left, host, true);
- remaining.remove(left);
}
}
@@ -351,12 +346,27 @@ class Fetcher extends Thread {
+ remaining.size() + " left.");
}
} finally {
- for (InputAttemptIdentifier left : remaining) {
- scheduler.putBackKnownMapOutput(host, left);
- }
+ putBackRemainingMapOutputs(host);
}
}
+ private void putBackRemainingMapOutputs(MapHost host) {
+ // Cycle through remaining MapOutputs
+ boolean isFirst = true;
+ InputAttemptIdentifier first = null;
+ for (InputAttemptIdentifier left : remaining) {
+ if (isFirst) {
+ first = left;
+ isFirst = false;
+ continue;
+ }
+ scheduler.putBackKnownMapOutput(host, left);
+ }
+ if (first != null) { // Empty remaining list.
+ scheduler.putBackKnownMapOutput(host, first);
+ }
+ }
+
private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
private InputAttemptIdentifier[] copyMapOutput(MapHost host,
@@ -388,7 +398,7 @@ class Fetcher extends Thread {
LOG.warn("Invalid map id ", e);
// Don't know which one was bad, so consider this one bad and dont read
// the remaining because we dont know where to start reading from. YARN-1773
- return new InputAttemptIdentifier[] {srcAttemptId = getNextRemainingAttempt()};
+ return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
}
@@ -481,14 +491,16 @@ class Fetcher extends Thread {
decompressedLength);
return false;
}
-
-// if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
-// wrongReduceErrs.increment(1);
-// LOG.warn(getName() + " data for the wrong reduce map: " +
-// srcAttemptId + " len: " + compressedLength + " decomp len: " +
-// decompressedLength + " for reduce " + forReduce);
-// return false;
-// }
+
+ // partitionId verification. Isn't availalbe here because it is encoded into
+ // URI
+ if (forReduce != currentPartition) {
+ wrongReduceErrs.increment(1);
+ LOG.warn(getName() + " data for the wrong partition map: " + srcAttemptId + " len: "
+ + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce
+ + ", expected partition: " + currentPartition);
+ return false;
+ }
// Sanity check
if (!remaining.contains(srcAttemptId)) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 583e1a1..2b940e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -192,6 +192,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
if (runShuffleFuture == null) {
return false;
}
+ // TODO This may return true, followed by the reader throwing the actual Exception.
+ // Fix as part of TEZ-919.
return runShuffleFuture.isDone();
//return scheduler.isDone() && merger.isMergeComplete();
}
@@ -249,6 +251,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
synchronized (this) {
if (throwable != null) {
+ // This exception will show up when someone tries iterating through the fetched Input.
+ // As part of TEZ-919, report this early.
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index b33b838..1b34430 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -58,7 +58,7 @@ class ShuffleScheduler {
private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
private static final int MAX_MAPS_AT_ONCE = 20;
- private static final long INITIAL_PENALTY = 10000;
+ private static final long INITIAL_PENALTY = 2000l; // 2 seconds
private static final float PENALTY_GROWTH_RATE = 1.3f;
// TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
@@ -80,7 +80,6 @@ class ShuffleScheduler {
new HashMap<String,IntWritable>();
private final TezInputContext inputContext;
private final Shuffle shuffle;
- private final int abortFailureLimit;
private final TezCounter shuffledMapsCounter;
private final TezCounter reduceShuffleBytes;
private final TezCounter reduceBytesDecompressed;
@@ -90,15 +89,15 @@ class ShuffleScheduler {
private final long startTime;
private long lastProgressTime;
-
- private int maxMapRuntime = 0;
- private int maxFailedUniqueFetches = 5;
+
private int maxFetchFailuresBeforeReporting;
-
+ private boolean reportReadErrorImmediately = true;
+ private int maxFailedUniqueFetches = 5;
+ private final int abortFailureLimit;
+ private int maxMapRuntime = 0;
+
private long totalBytesShuffledTillNow = 0;
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
-
- private boolean reportReadErrorImmediately = true;
public ShuffleScheduler(TezInputContext inputContext,
Configuration conf,
@@ -124,6 +123,7 @@ class ShuffleScheduler {
this.bytesShuffledToMem = bytesShuffledToMem;
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
+
this.maxFailedUniqueFetches = Math.min(numberOfInputs,
this.maxFailedUniqueFetches);
referee.start();
@@ -135,6 +135,14 @@ class ShuffleScheduler {
conf.getBoolean(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+
+ LOG.info("ShuffleScheduler running for sourceVertex: "
+ + inputContext.getSourceVertexName() + " with configuration: "
+ + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
+ + ", reportReadErrorImmediately=" + reportReadErrorImmediately
+ + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
+ + ", abortFailureLimit=" + abortFailureLimit
+ + ", maxMapRuntime=" + maxMapRuntime);
}
public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier,
@@ -200,6 +208,8 @@ class ShuffleScheduler {
failureCounts.put(srcAttempt, new IntWritable(1));
}
String hostname = host.getHostName();
+ // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error
+ // reporting / potential blacklisting of hosts.
if (hostFailures.containsKey(hostname)) {
IntWritable x = hostFailures.get(hostname);
x.set(x.get() + 1);
@@ -207,6 +217,13 @@ class ShuffleScheduler {
hostFailures.put(hostname, new IntWritable(1));
}
if (failures >= abortFailureLimit) {
+ // This task has seen too many fetch failures - report it as failed. The
+ // AM may retry it if max failures has not been reached.
+
+ // Between the task and the AM - someone needs to determine who is at
+ // fault. If there's enough errors seen on the task, before the AM informs
+ // it about source failure, the task considers itself to have failed and
+ // allows the AM to re-schedule it.
IOException ioe = new IOException(failures
+ " failures downloading "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
@@ -215,7 +232,8 @@ class ShuffleScheduler {
ioe.fillInStackTrace();
shuffle.reportException(ioe);
}
-
+
+ failedShuffleCounter.increment(1);
checkAndInformAM(failures, srcAttempt, readError);
checkReducerHealth();
@@ -223,12 +241,10 @@ class ShuffleScheduler {
long delay = (long) (INITIAL_PENALTY *
Math.pow(PENALTY_GROWTH_RATE, failures));
- penalties.add(new Penalty(host, delay));
-
- failedShuffleCounter.increment(1);
+ penalties.add(new Penalty(host, delay));
}
- // Notify the JobTracker
+ // Notify the AM
// after every read error, if 'reportReadErrorImmediately' is true or
// after every 'maxFetchFailuresBeforeReporting' failures
private void checkAndInformAM(
@@ -248,10 +264,9 @@ class ShuffleScheduler {
.getInputIndex(), srcAttempt.getAttemptNumber()));
inputContext.sendEvents(failedEvents);
- //status.addFailedDependency(mapId);
}
}
-
+
private void checkReducerHealth() {
final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
@@ -293,8 +308,10 @@ class ShuffleScheduler {
failureCounts.size() == (numInputs - doneMaps))
&& !reducerHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
- LOG.fatal("Shuffle failed with too many fetch failures " +
- "and insufficient progress!");
+ LOG.fatal("Shuffle failed with too many fetch failures " + "and insufficient progress!"
+ + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps)
+ + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
+ + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
shuffle.reportException(new IOException(errorMsg));
}
@@ -368,23 +385,32 @@ class ShuffleScheduler {
public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
+
Map<Integer, InputAttemptIdentifier> dedupedList = new LinkedHashMap<Integer, InputAttemptIdentifier>();
Iterator<InputAttemptIdentifier> listItr = origList.iterator();
while (listItr.hasNext()) {
- // we may want to try all versions of the input but with current retry
+ // we may want to try all versions of the input but with current retry
// behavior older ones are likely to be lost and should be ignored.
// This may be removed after TEZ-914
InputAttemptIdentifier id = listItr.next();
- Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
- InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
- if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
- dedupedList.put(inputNumber, id);
- if (oldId != null) {
- LOG.warn("Ignoring older source: " + oldId +
- " in favor of newer source: " + id);
+ if (inputShouldBeConsumed(id)) {
+ Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
+ InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
+ if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
+ dedupedList.put(inputNumber, id);
+ if (oldId != null) {
+ LOG.warn("Old Src for InputIndex: " + inputNumber + " with attemptNumber: "
+ + oldId.getAttemptNumber()
+ + " was not determined to be invalid. Ignoring it for now in favour of "
+ + id.getAttemptNumber());
+ }
}
+ } else {
+ LOG.info("Ignoring finished or obsolete source: " + id);
}
}
+
+ // Compute the final list, limited by NUM_FETCHERS_AT_ONCE
List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
int includedMaps = 0;
int totalSize = dedupedList.size();
@@ -392,23 +418,16 @@ class ShuffleScheduler {
// find the maps that we still need, up to the limit
while (dedupedItr.hasNext()) {
InputAttemptIdentifier id = dedupedItr.next().getValue();
- if (inputShouldBeConsumed(id)) {
result.add(id);
if (++includedMaps >= MAX_MAPS_AT_ONCE) {
break;
}
- } else {
- LOG.info("Ignoring finished or obsolete source: " + id);
- }
}
+
// put back the maps left after the limit
while (dedupedItr.hasNext()) {
InputAttemptIdentifier id = dedupedItr.next().getValue();
- if (inputShouldBeConsumed(id)) {
- host.addKnownMap(id);
- } else {
- LOG.info("Ignoring finished or obsolete source: " + id);
- }
+ host.addKnownMap(id);
}
LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
host + " to " + Thread.currentThread().getName());
@@ -425,7 +444,7 @@ class ShuffleScheduler {
LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
(System.currentTimeMillis()-shuffleStart.get()) + "s");
}
-
+
public synchronized void resetKnownMaps() {
mapLocations.clear();
obsoleteInputs.clear();