You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/10/21 22:33:50 UTC
tez git commit: TEZ-2882. Consider improving fetch failure handling
(rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master a9cfeb914 -> 46f2454ac
TEZ-2882. Consider improving fetch failure handling (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/46f2454a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/46f2454a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/46f2454a
Branch: refs/heads/master
Commit: 46f2454acfb22eebd7937280854b2a8d6b5003b9
Parents: a9cfeb9
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed Oct 21 13:34:14 2015 -0700
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed Oct 21 13:34:14 2015 -0700
----------------------------------------------------------------------
.../library/api/TezRuntimeConfiguration.java | 108 ++++
.../orderedgrouped/FetcherOrderedGrouped.java | 10 +-
.../ShuffleInputEventHandlerOrderedGrouped.java | 9 +-
.../orderedgrouped/ShuffleScheduler.java | 462 ++++++++++----
.../library/input/OrderedGroupedKVInput.java | 14 +
.../shuffle/orderedgrouped/TestFetcher.java | 10 +-
...tShuffleInputEventHandlerOrderedGrouped.java | 22 +-
.../orderedgrouped/TestShuffleScheduler.java | 623 ++++++++++++++++++-
.../TestOrderedGroupedMergedKVInputConfig.java | 30 +
9 files changed, 1149 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index a84448f..cf05546 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -198,6 +198,106 @@ public class TezRuntimeConfiguration {
"shuffle.fetch.failures.limit";
public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5;
+ @Private
+ @Unstable
+ @ConfigurationProperty(type = "integer")
+ /**
+ * Expert setting made available only for debugging. Do not change it. Sets
+ * the number of retries before giving up on downloading from source
+ * attempt by consumer. Code internally handles the threshold if set to -1.
+ */
+ public static final String
+ TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT =
+ TEZ_RUNTIME_PREFIX + "shuffle.src-attempt.abort.limit";
+ public static final int
+ TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT = -1;
+
+ @Private
+ @Unstable
+ @ConfigurationProperty(type = "float")
+ /**
+ * Expert setting made available only for debugging. Do not change it. Setting
+ * to determine if failures happened across a percentage of nodes. This
+ * helps in determining if the consumer has to be restarted on continuous
+ * failures. Setting it to lower value can make consumer restarts more
+ * aggressive on failures.
+ */
+ public static final String
+ TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION =
+ TEZ_RUNTIME_PREFIX + "shuffle.acceptable.host-fetch.failure.fraction";
+ public static final float
+ TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT = 0.2f;
+
+ @Private
+ @Unstable
+ @ConfigurationProperty(type = "integer")
+ /**
+ * Expert setting made available only for debugging. Do not change it. Setting
+ * to determine if the consumer has to be restarted on continuous
+ * failures across nodes. Used along with {@link
+ * TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION}.
+ */
+ public static final String
+ TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST =
+ TEZ_RUNTIME_PREFIX + "shuffle.min.failures.per.host";
+ public static final int TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT = 4;
+
+ @Private
+ @Unstable
+ @ConfigurationProperty(type = "float")
+ /**
+ * Expert setting made available only for debugging. Do not change it.
+ * Maximum percentage of time (compared to overall progress), the fetcher is
+ * allowed before concluding that it is stalled.
+ */
+ public static final String TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION =
+ TEZ_RUNTIME_PREFIX + "shuffle.max.stall.time.fraction";
+ public static final float
+ TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT = 0.5f;
+
+ @Private
+ @Unstable
+ @ConfigurationProperty(type = "float")
+ /**
+ * Expert setting made available only for debugging. Do not change it.
+ * Fraction to determine whether the shuffle has progressed enough or not
+ * If it has not progressed enough, it could be qualified for the consumer.
+ */
+ public static final String
+ TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION =
+ TEZ_RUNTIME_PREFIX + "shuffle.min.required.progress.fraction";
+ public static final float
+ TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT = 0.5f;
+
+ @Private
+ @Unstable
+ @ConfigurationProperty(type = "float")
+ /**
+ * Expert setting made available only for debugging. Do not change it.
+ * Provides threshold for determining whether fetching has to be marked
+ * unhealthy based on the ratio of (failures/(failures+completed))
+ */
+ public static final String
+ TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION =
+ TEZ_RUNTIME_PREFIX + "shuffle.max.allowed.failed.fetch.fraction";
+ public static final float
+ TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT = 0.5f;
+
+ @Private
+ @Unstable
+ @ConfigurationProperty(type = "boolean")
+ /**
+ * Expert setting made available only for debugging. Do not change it.
+ * Provides threshold for determining whether fetching has to be marked
+ * unhealthy based on the ratio of (failures/(failures+completed))
+ */
+ public static final String
+ TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION =
+ TEZ_RUNTIME_PREFIX + "shuffle.failed.check.since-last.completion";
+ public static final boolean
+ TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT = true;
+
+
@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE =
TEZ_RUNTIME_PREFIX +
@@ -418,6 +518,14 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+ tezRuntimeKeys.add
+ (TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS);
tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 1b4031d..93f083d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -297,7 +297,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
} else {
LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks));
for (InputAttemptIdentifier left : failedTasks) {
- scheduler.copyFailed(left, host, true, false);
+ scheduler.copyFailed(left, host, true, false, false);
}
}
}
@@ -359,7 +359,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
for (InputAttemptIdentifier left : remaining.values()) {
// Need to be handling temporary glitches ..
// Report read error to the AM to trigger source failure heuristics
- scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
+ scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded, false);
}
return false;
}
@@ -505,7 +505,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
retryStartTime = 0;
scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
- endTime - startTime, mapOutput);
+ endTime - startTime, mapOutput, false);
// Note successful shuffle
remaining.remove(srcAttemptId.toString());
metrics.successFetch();
@@ -667,7 +667,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
long endTime = System.currentTimeMillis();
scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
- indexRecord.getRawLength(), (endTime - startTime), mapOutput);
+ indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
iter.remove();
metrics.successFetch();
} catch (IOException e) {
@@ -677,7 +677,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
if (!stopped) {
metrics.failedFetch();
ioErrs.increment(1);
- scheduler.copyFailed(srcAttemptId, host, true, false);
+ scheduler.copyFailed(srcAttemptId, host, true, false, true);
LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
host.getHostIdentifier(), e);
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index e0473b3..f8c9553 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -50,7 +50,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
private final ShuffleScheduler scheduler;
private final InputContext inputContext;
- private int maxMapRuntime = 0;
private final boolean sslShuffle;
private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
@@ -110,12 +109,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
+ ", attemptNum: " + dmEvent.getVersion() + ", payload: " +
ShuffleUtils.stringify(shufflePayload));
}
- // TODO NEWTEZ See if this duration hack can be removed.
- int duration = shufflePayload.getRunDuration();
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- scheduler.informMaxMapRunTime(maxMapRuntime);
- }
if (shufflePayload.hasEmptyPartitions()) {
try {
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
@@ -128,7 +121,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
+ srcAttemptIdentifier + "]. Not fetching.");
}
numDmeEventsNoData.incrementAndGet();
- scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
+ scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null, true);
return;
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index f45ca35..22da46c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -94,7 +95,7 @@ class ShuffleScheduler {
private final AtomicLong shuffleStart = new AtomicLong(0);
private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
- private static final long INITIAL_PENALTY = 2000l; // 2 seconds
+ static final long INITIAL_PENALTY = 2000L; // 2 seconds
private static final float PENALTY_GROWTH_RATE = 1.3f;
private final BitSet finishedMaps;
@@ -103,22 +104,26 @@ class ShuffleScheduler {
@VisibleForTesting
final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
//TODO Clean this and other maps at some point
- private final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>();
+ @VisibleForTesting
+ final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap
+ = new ConcurrentHashMap<String, InputAttemptIdentifier>();
//To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
// enabled in source.
@VisibleForTesting
- final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
+ final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap;
- private final Set<MapHost> pendingHosts = new HashSet<MapHost>();
+ @VisibleForTesting
+ final Set<MapHost> pendingHosts = new HashSet<MapHost>();
private final Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final Random random = new Random(System.currentTimeMillis());
private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
private final Referee referee;
- private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
- new HashMap<InputAttemptIdentifier,IntWritable>();
+ @VisibleForTesting
+ final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>();
+ final Set<String> uniqueHosts = Sets.newHashSet();
private final Map<String,IntWritable> hostFailures =
new HashMap<String,IntWritable>();
private final InputContext inputContext;
@@ -126,7 +131,8 @@ class ShuffleScheduler {
private final TezCounter skippedInputCounter;
private final TezCounter reduceShuffleBytes;
private final TezCounter reduceBytesDecompressed;
- private final TezCounter failedShuffleCounter;
+ @VisibleForTesting
+ final TezCounter failedShuffleCounter;
private final TezCounter bytesShuffledToDisk;
private final TezCounter bytesShuffledToDiskDirect;
private final TezCounter bytesShuffledToMem;
@@ -134,9 +140,13 @@ class ShuffleScheduler {
private final TezCounter lastEventReceived;
private final String srcNameTrimmed;
- private final AtomicInteger remainingMaps;
+ @VisibleForTesting
+ final AtomicInteger remainingMaps;
private final long startTime;
- private long lastProgressTime;
+ @VisibleForTesting
+ long lastProgressTime;
+ @VisibleForTesting
+ long failedShufflesSinceLastCompletion;
private final int numFetchers;
private final Set<FetcherOrderedGrouped> runningFetchers =
@@ -171,12 +181,18 @@ class ShuffleScheduler {
private final boolean reportReadErrorImmediately;
private final int maxFailedUniqueFetches;
private final int abortFailureLimit;
- private int maxMapRuntime = 0;
+
+ private final int minFailurePerHost;
+ private final float hostFailureFraction;
+ private final float maxStallTimeFraction;
+ private final float minReqProgressFraction;
+ private final float maxAllowedFailedFetchFraction;
+ private final boolean checkFailedFetchSinceLastCompletion;
private volatile Thread shuffleSchedulerThread = null;
private long totalBytesShuffledTillNow = 0;
- private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+ private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
public ShuffleScheduler(InputContext inputContext,
Configuration conf,
@@ -195,7 +211,15 @@ class ShuffleScheduler {
this.allocator = allocator;
this.mergeManager = mergeManager;
this.numInputs = numberOfInputs;
- abortFailureLimit = Math.max(30, numberOfInputs / 10);
+ int abortFailureLimitConf = conf.getInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT);
+ if (abortFailureLimitConf <= -1) {
+ abortFailureLimit = Math.max(15, numberOfInputs / 10);
+ } else {
+ //No upper cap, as user is setting this intentionally
+ abortFailureLimit = abortFailureLimitConf;
+ }
remainingMaps = new AtomicInteger(numberOfInputs);
finishedMaps = new BitSet(numberOfInputs);
this.ifileReadAhead = ifileReadAhead;
@@ -211,6 +235,47 @@ class ShuffleScheduler {
localDiskFetchEnabled = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+
+ this.minFailurePerHost = conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT);
+ Preconditions.checkArgument(minFailurePerHost >= 0,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST
+ + "=" + minFailurePerHost + " should not be negative");
+
+ this.hostFailureFraction = conf.getFloat(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION,
+ TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT);
+
+ this.maxStallTimeFraction = conf.getFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT);
+ Preconditions.checkArgument(maxStallTimeFraction >= 0,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION
+ + "=" + maxStallTimeFraction + " should not be negative");
+
+ this.minReqProgressFraction = conf.getFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION,
+ TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT);
+ Preconditions.checkArgument(minReqProgressFraction >= 0,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION
+ + "=" + minReqProgressFraction + " should not be negative");
+
+ this.maxAllowedFailedFetchFraction = conf.getFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT);
+ Preconditions.checkArgument(maxAllowedFailedFetchFraction >= 0,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION
+ + "=" + maxAllowedFailedFetchFraction + " should not be negative");
+
+ this.checkFailedFetchSinceLastCompletion = conf.getBoolean
+ (TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION,
+ TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
+
this.localHostname = inputContext.getExecutionContext().getHostName();
final ByteBuffer shuffleMetadata =
inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
@@ -284,16 +349,22 @@ class ShuffleScheduler {
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
- shuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
+ pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
LOG.info("ShuffleScheduler running for sourceVertex: "
+ inputContext.getSourceVertexName() + " with configuration: "
+ "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
+ ", reportReadErrorImmediately=" + reportReadErrorImmediately
+ ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
+ ", abortFailureLimit=" + abortFailureLimit
- + ", maxMapRuntime=" + maxMapRuntime
+ ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce
- + ", numFetchers=" + numFetchers);
+ + ", numFetchers=" + numFetchers
+ + ", hostFailureFraction=" + hostFailureFraction
+ + ", minFailurePerHost=" + minFailurePerHost
+ + ", maxAllowedFailedFetchFraction=" + maxAllowedFailedFetchFraction
+ + ", maxStallTimeFraction=" + maxStallTimeFraction
+ + ", minReqProgressFraction=" + minReqProgressFraction
+ + ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion
+ );
}
public void start() throws Exception {
@@ -391,10 +462,17 @@ class ShuffleScheduler {
long bytesCompressed,
long bytesDecompressed,
long millis,
- MapOutput output
+ MapOutput output,
+ boolean isLocalFetch
) throws IOException {
if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
+ if (!isLocalFetch) {
+ /**
+ * Reset it only when it is a non-local-disk copy.
+ */
+ failedShufflesSinceLastCompletion = 0;
+ }
if (output != null) {
failureCounts.remove(srcAttemptIdentifier);
@@ -435,12 +513,12 @@ class ShuffleScheduler {
return;
}
- ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier);
+ ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(inputIdentifier);
//Possible that Shuffle event handler invoked this, due to empty partitions
if (eventInfo == null && output == null) {
eventInfo = new ShuffleEventInfo(srcAttemptIdentifier);
- shuffleInfoEventsMap.put(inputIdentifier, eventInfo);
+ pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo);
}
assert(eventInfo != null);
@@ -455,10 +533,10 @@ class ShuffleScheduler {
if (eventInfo.isDone()) {
remainingMaps.decrementAndGet();
setInputFinished(inputIdentifier.getInputIndex());
- shuffleInfoEventsMap.remove(inputIdentifier);
+ pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
if (LOG.isTraceEnabled()) {
LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " +
- shuffleInfoEventsMap);
+ pipelinedShuffleInfoEventsMap);
}
}
@@ -486,7 +564,8 @@ class ShuffleScheduler {
}
} else {
// input is already finished. duplicate fetch.
- LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
+ LOG.warn("Duplicate fetch of input no longer needs to be fetched: "
+ + srcAttemptIdentifier);
// free the resource - specially memory
// If the src does not generate data, output will be null.
@@ -501,7 +580,7 @@ class ShuffleScheduler {
//For pipelined shuffle.
//TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt
if (input.canRetrieveInputInChunks()) {
- ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
+ ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(input.getInputIdentifier());
if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) {
reportExceptionForInput(new IOException("Previous event already got scheduled for " +
input + ". Previous attempt's data could have been already merged "
@@ -512,7 +591,8 @@ class ShuffleScheduler {
}
if (eventInfo == null) {
- shuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input));
+ pipelinedShuffleInfoEventsMap.put(input.getInputIdentifier(),
+ new ShuffleEventInfo(input));
}
}
return true;
@@ -543,95 +623,264 @@ class ShuffleScheduler {
public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
MapHost host,
boolean readError,
- boolean connectError) {
- host.penalize();
- int failures = 1;
- if (failureCounts.containsKey(srcAttempt)) {
- IntWritable x = failureCounts.get(srcAttempt);
- x.set(x.get() + 1);
- failures = x.get();
- } else {
- failureCounts.put(srcAttempt, new IntWritable(1));
+ boolean connectError,
+ boolean isLocalFetch) {
+ failedShuffleCounter.increment(1);
+
+ int failures = incrementAndGetFailureAttempt(srcAttempt);
+
+ if (!isLocalFetch) {
+ /**
+ * Track the number of failures that has happened since last completion.
+ * This gets reset on a successful copy.
+ */
+ failedShufflesSinceLastCompletion++;
}
- String hostPort = host.getHostIdentifier();
- // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error
- // reporting / potential blacklisting of hosts.
- if (hostFailures.containsKey(hostPort)) {
- IntWritable x = hostFailures.get(hostPort);
- x.set(x.get() + 1);
- } else {
- hostFailures.put(hostPort, new IntWritable(1));
+
+ /**
+ * Inform AM:
+ * - In case of read/connect error
+ * - In case attempt failures exceed threshold of
+ * maxFetchFailuresBeforeReporting (5)
+ * Bail-out if needed:
+ * - Check whether individual attempt crossed failure threshold limits
+ * - Check overall shuffle health. Bail out if needed.*
+ */
+
+ //TEZ-2890
+ boolean shouldInformAM =
+ (reportReadErrorImmediately && (readError || connectError))
+ || ((failures % maxFetchFailuresBeforeReporting) == 0);
+
+ if (shouldInformAM) {
+ //Inform AM. In case producer needs to be restarted, it is handled at AM.
+ informAM(srcAttempt);
+ }
+
+ //Restart consumer in case shuffle is not healthy
+ if (!isShuffleHealthy(srcAttempt)) {
+ return;
}
- if (failures >= abortFailureLimit) {
+
+ penalizeHost(host, failures);
+ }
+
+ private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
+ int attemptFailures = getFailureCount(srcAttempt);
+ if (attemptFailures >= abortFailureLimit) {
// This task has seen too many fetch failures - report it as failed. The
// AM may retry it if max failures has not been reached.
-
+
// Between the task and the AM - someone needs to determine who is at
// fault. If there's enough errors seen on the task, before the AM informs
// it about source failure, the task considers itself to have failed and
// allows the AM to re-schedule it.
- IOException ioe = new IOException(failures
- + " failures downloading "
- + TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
- srcAttempt.getAttemptNumber()));
- ioe.fillInStackTrace();
+ String errorMsg = "Failed " + attemptFailures + " times trying to "
+ + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier(
+ inputContext.getSourceVertexName(),
+ srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
+ IOException ioe = new IOException(errorMsg);
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(ioe);
+ return true;
}
+ return false;
+ }
- failedShuffleCounter.increment(1);
- checkAndInformAM(failures, srcAttempt, readError, connectError);
+ private void penalizeHost(MapHost host, int failures) {
+ host.penalize();
+
+ String hostPort = host.getHostIdentifier();
+ // TODO TEZ-922 hostFailures isn't really used for anything apart from
+ // hasFailedAcrossNodes().Factor it into error
+ // reporting / potential blacklisting of hosts.
+ if (hostFailures.containsKey(hostPort)) {
+ IntWritable x = hostFailures.get(hostPort);
+ x.set(x.get() + 1);
+ } else {
+ hostFailures.put(hostPort, new IntWritable(1));
+ }
- checkReducerHealth();
-
long delay = (long) (INITIAL_PENALTY *
Math.pow(PENALTY_GROWTH_RATE, failures));
-
penalties.add(new Penalty(host, delay));
}
+ private int getFailureCount(InputAttemptIdentifier srcAttempt) {
+ IntWritable failureCount = failureCounts.get(srcAttempt);
+ return (failureCount == null) ? 0 : failureCount.get();
+ }
+
+ private int incrementAndGetFailureAttempt(InputAttemptIdentifier srcAttempt) {
+ int failures = 1;
+ if (failureCounts.containsKey(srcAttempt)) {
+ IntWritable x = failureCounts.get(srcAttempt);
+ x.set(x.get() + 1);
+ failures = x.get();
+ } else {
+ failureCounts.put(srcAttempt, new IntWritable(1));
+ }
+ return failures;
+ }
+
public void reportLocalError(IOException ioe) {
- LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", ioe);
+ LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error",
+ ioe);
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(ioe);
}
- // Notify the AM
- // after every read error, if 'reportReadErrorImmediately' is true or
- // after every 'maxFetchFailuresBeforeReporting' failures
- private void checkAndInformAM(
- int failures, InputAttemptIdentifier srcAttempt, boolean readError,
- boolean connectError) {
- if ((reportReadErrorImmediately && (readError || connectError))
- || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
- LOG.info(srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
- + srcAttempt + " taskAttemptIdentifier: "
- + TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
- srcAttempt.getAttemptNumber()) + " to AM.");
- List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
- failedEvents.add(InputReadErrorEvent.create("Fetch failure for "
- + TezRuntimeUtils.getTaskAttemptIdentifier(
- inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
- srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
- .getInputIndex(), srcAttempt.getAttemptNumber()));
-
- inputContext.sendEvents(failedEvents);
+ // Notify AM
+ private void informAM(InputAttemptIdentifier srcAttempt) {
+ LOG.info(
+ srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
+ + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils
+ .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+ srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getAttemptNumber()) + " to AM.");
+ List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+ failedEvents.add(InputReadErrorEvent.create(
+ "Fetch failure for " + TezRuntimeUtils
+ .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
+ srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getAttemptNumber()) + " to jobtracker.",
+ srcAttempt.getInputIdentifier().getInputIndex(),
+ srcAttempt.getAttemptNumber()));
+
+ inputContext.sendEvents(failedEvents);
+ }
+
+ /**
+ * To determine if failures happened across nodes or not. This will help in
+ * determining whether this task needs to be restarted or source needs to
+ * be restarted.
+ *
+ * @param logContext context info for logging
+ * @return boolean true indicates this task needs to be restarted
+ */
+ private boolean hasFailedAcrossNodes(String logContext) {
+ int numUniqueHosts = uniqueHosts.size();
+ Preconditions.checkArgument(numUniqueHosts > 0, "No values in unique hosts");
+ int threshold = Math.max(3,
+ (int) Math.ceil(numUniqueHosts * hostFailureFraction));
+ int total = 0;
+ boolean failedAcrossNodes = false;
+ for(String host : uniqueHosts) {
+ IntWritable failures = hostFailures.get(host);
+ if (failures != null && failures.get() > minFailurePerHost) {
+ total++;
+ failedAcrossNodes = (total > (threshold * minFailurePerHost));
+ if (failedAcrossNodes) {
+ break;
+ }
+ }
+ }
+
+ LOG.info(logContext + ", numUniqueHosts=" + numUniqueHosts
+ + ", hostFailureThreshold=" + threshold
+ + ", hostFailuresCount=" + hostFailures.size()
+ + ", hosts crossing threshold=" + total
+ + ", reducerFetchIssues=" + failedAcrossNodes
+ );
+
+ return failedAcrossNodes;
+ }
+
+ private boolean allEventsReceived() {
+ if (!pipelinedShuffleInfoEventsMap.isEmpty()) {
+ return (pipelinedShuffleInfoEventsMap.size() == numInputs);
+ } else {
+ //no pipelining
+ return ((pathToIdentifierMap.size() + skippedInputCounter.getValue())
+ == numInputs);
}
}
- private void checkReducerHealth() {
- final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
- final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
- final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+ /**
+ * Check if consumer needs to be restarted based on total failures w.r.t
+ * completed outputs and based on number of errors that have happened since
+ * last successful completion. Consider into account whether failures have
+ * been seen across different nodes.
+ *
+ * @return true to indicate fetchers are healthy
+ */
+ private boolean isFetcherHealthy(String logContext) {
long totalFailures = failedShuffleCounter.getValue();
int doneMaps = numInputs - remainingMaps.get();
-
- boolean reducerHealthy =
- (((float)totalFailures / (totalFailures + doneMaps))
- < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+
+ boolean fetcherHealthy = true;
+ if (doneMaps > 0) {
+ fetcherHealthy = (((float) totalFailures / (totalFailures + doneMaps))
+ < maxAllowedFailedFetchFraction);
+ }
+
+ if (fetcherHealthy) {
+ //Compute this logic only when all events are received
+ if (allEventsReceived()) {
+ if (hostFailureFraction > 0) {
+ boolean failedAcrossNodes = hasFailedAcrossNodes(logContext);
+ if (failedAcrossNodes) {
+ return false; //not healthy
+ }
+ }
+
+ if (checkFailedFetchSinceLastCompletion) {
+ /**
+ * remainingMaps works better instead of pendingHosts in the
+ * following condition because of the way the fetcher reports failures
+ */
+ if (failedShufflesSinceLastCompletion >=
+ remainingMaps.get() * minFailurePerHost) {
+ /**
+ * Check if lots of errors are seen after last progress time.
+ *
+ * E.g totalFailures = 20. doneMaps = 320 - 300;
+ * fetcherHealthy = (20/(20+300)) < 0.5. So reducer would be marked as healthy.
+ * Assume 20 errors happen when downloading the last 20 attempts. Host failure & individual
+ * attempt failures would keep increasing; but at very slow rate 15 * 180 seconds per
+ * attempt to find out the issue.
+ *
+ * Instead consider the new errors with the pending items to be fetched.
+ * Assume 21 new errors happened after last progress; remainingMaps = (320-300) = 20;
+ * (21 / (21 + 20)) > 0.5
+ * So we reset the reducer to unhealthy here (special case)
+ *
+ * In normal conditions (i.e happy path), this wouldn't even cause any issue as
+ * failedShufflesSinceLastCompletion is reset as soon as we see successful download.
+ */
+
+ fetcherHealthy =
+ (((float) failedShufflesSinceLastCompletion / (
+ failedShufflesSinceLastCompletion + remainingMaps.get()))
+ < maxAllowedFailedFetchFraction);
+
+ LOG.info(logContext + ", fetcherHealthy=" + fetcherHealthy
+ + ", failedShufflesSinceLastCompletion="
+ + failedShufflesSinceLastCompletion
+ + ", remainingMaps=" + remainingMaps.get()
+ );
+ }
+ }
+ }
+ }
+ return fetcherHealthy;
+ }
+
+ boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) {
+
+ if (isAbortLimitExceeedFor(srcAttempt)) {
+ return false;
+ }
+
+ final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
+ final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;
+
+ int doneMaps = numInputs - remainingMaps.get();
+
+ String logContext = "srcAttempt=" + srcAttempt.toString();
+ boolean fetcherHealthy = isFetcherHealthy(logContext);
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
@@ -647,31 +896,31 @@ class ShuffleScheduler {
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);
- // min time the reducer should run without getting killed
- int minShuffleRunDuration =
- (shuffleProgressDuration > maxMapRuntime)
- ? shuffleProgressDuration
- : maxMapRuntime;
-
- boolean reducerStalled =
- (((float)stallDuration / minShuffleRunDuration)
+ boolean reducerStalled = (shuffleProgressDuration > 0) &&
+ (((float)stallDuration / shuffleProgressDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);
// kill if not healthy and has insufficient progress
if ((failureCounts.size() >= maxFailedUniqueFetches ||
failureCounts.size() == (numInputs - doneMaps))
- && !reducerHealthy
+ && !fetcherHealthy
&& (!reducerProgressedEnough || reducerStalled)) {
- LOG.error(srcNameTrimmed + ": " + "Shuffle failed with too many fetch failures " +
- "and insufficient progress!"
- + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps)
- + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
- + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
- String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
+ String errorMsg = (srcNameTrimmed + ": "
+ + "Shuffle failed with too many fetch failures and insufficient progress!"
+ + "failureCounts=" + failureCounts.size()
+ + ", pendingInputs=" + (numInputs - doneMaps)
+ + ", fetcherHealthy=" + fetcherHealthy
+ + ", reducerProgressedEnough=" + reducerProgressedEnough
+ + ", reducerStalled=" + reducerStalled);
+ LOG.error(errorMsg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Host failures=" + hostFailures.keySet());
+ }
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(new IOException(errorMsg));
+ return false;
}
-
+ return true;
}
public synchronized void addKnownMapOutput(String inputHostName,
@@ -680,6 +929,7 @@ class ShuffleScheduler {
String hostUrl,
InputAttemptIdentifier srcAttempt) {
String hostPort = (inputHostName + ":" + String.valueOf(port));
+ uniqueHosts.add(hostPort);
String identifier = MapHost.createIdentifier(hostPort, partitionId);
@@ -697,7 +947,8 @@ class ShuffleScheduler {
host.addKnownMap(srcAttempt);
pathToIdentifierMap.put(
- getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
+ getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(),
+ partitionId), srcAttempt);
// Mark the host as pending
if (host.getState() == MapHost.State.PENDING) {
@@ -709,8 +960,8 @@ class ShuffleScheduler {
public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
// The incoming srcAttempt does not contain a path component.
LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt);
- if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
- //Pipelined shuffle case (where shuffleInfoEventsMap gets populated).
+ if (pipelinedShuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
+ //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated).
//Fail fast here.
exceptionReporter.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it "
+ "exists in shuffleInfoEventMap. Some data could have been already merged "
@@ -935,12 +1186,7 @@ class ShuffleScheduler {
}
}
- public synchronized void informMaxMapRunTime(int duration) {
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- }
- }
-
+
void setInputFinished(int inputIndex) {
synchronized(finishedMaps) {
finishedMaps.set(inputIndex, true);
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 880dc2f..39cc471 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -355,6 +355,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+ confKeys.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
+ confKeys.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
+ confKeys.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
+ confKeys.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
+ confKeys.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
+ confKeys.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
+ confKeys.add(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index ceb0266..faa2d31 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -278,7 +278,7 @@ public class TestFetcher {
}).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId()));
doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
- anyLong(), anyLong(), anyLong(), any(MapOutput.class));
+ anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
doNothing().when(scheduler).putBackKnownMapOutput(host,
srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
doNothing().when(scheduler).putBackKnownMapOutput(host,
@@ -290,8 +290,8 @@ public class TestFetcher {
for (int i : sucessfulAttemptsIndexes) {
verifyCopySucceeded(scheduler, host, srcAttempts, i);
}
- verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false);
- verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false);
+ verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true);
+ verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true);
verify(metrics, times(3)).successFetch();
verify(metrics, times(2)).failedFetch();
@@ -309,7 +309,7 @@ public class TestFetcher {
String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),
- eq(p * 1000), anyLong(), captureMapOutput.capture());
+ eq(p * 1000), anyLong(), captureMapOutput.capture(), anyBoolean());
// cannot use the equals of MapOutput as it compares id which is private. so doing it manually
MapOutput m = captureMapOutput.getAllValues().get(0);
@@ -414,7 +414,7 @@ public class TestFetcher {
verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class));
//since copyMapOutput consistently fails, it should call copyFailed once
verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class),
- anyBoolean(), anyBoolean());
+ anyBoolean(), anyBoolean(), anyBoolean());
verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class));
verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class),
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 78d214c..88a1d20 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -171,7 +171,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
int partitionId = attemptNum;
verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1));
- verify(scheduler).shuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
+ verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
//Send final_update event.
Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1);
@@ -181,9 +181,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
handler.handleEvents(Collections.singletonList(dme2));
baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
partitionId = attemptNum;
- assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
+ assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2));
- assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
+ assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
MapHost host = scheduler.getHost();
assertTrue(host != null);
@@ -191,10 +191,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
assertTrue(!list.isEmpty());
//Let the final_update event pass
MapOutput output = MapOutput.createMemoryMapOutput(id2, mergeManager, 1000, true);
- scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output);
+ scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output, false);
assertTrue(!scheduler.isDone()); //we haven't downloaded id1 yet
output = MapOutput.createMemoryMapOutput(id1, mergeManager, 1000, true);
- scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output);
+ scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output, false);
assertTrue(!scheduler.isDone()); //we haven't downloaded another source yet
//Send events for source 2
@@ -210,11 +210,11 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex()));
- scheduler.copySucceeded(id4, null, 0, 0, 0, null);
+ scheduler.copySucceeded(id4, null, 0, 0, 0, null, false);
assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet
//Let the incremental event pass
output = MapOutput.createMemoryMapOutput(id3, mergeManager, 1000, true);
- scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output);
+ scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output, false);
assertTrue(scheduler.isDone());
}
@@ -234,7 +234,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1));
assertTrue("Shuffle info events should not be empty for pipelined shuffle",
- !scheduler.shuffleInfoEventsMap.isEmpty());
+ !scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
//Attempt #0 comes up. When processing this, it should report exception
attemptNum = 0;
@@ -263,7 +263,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
eq(baseUri), eq(expectedIdentifier));
assertTrue("Shuffle info events should be empty for regular shuffle codepath",
- scheduler.shuffleInfoEventsMap.isEmpty());
+ scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
}
@Test(timeout = 5000)
@@ -288,7 +288,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
handler.handleEvents(events);
InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
- eq(0l), eq(0l), any(MapOutput.class));
+ eq(0l), eq(0l), any(MapOutput.class), eq(true));
}
@Test(timeout = 5000)
@@ -302,7 +302,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
handler.handleEvents(events);
InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
- eq(0l), eq(0l), any(MapOutput.class));
+ eq(0l), eq(0l), any(MapOutput.class), eq(true));
}
@Test(timeout = 5000)
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index ac6c6c0..3fe540c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -14,10 +14,16 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.assertEquals;
@@ -143,7 +149,7 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput);
+ scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
scheduler.freeHost(mapHosts[i]);
}
@@ -155,6 +161,617 @@ public class TestShuffleScheduler {
}
}
+ @Test(timeout = 60000)
+ /**
+ * Scenario
+ * - reducer has not progressed enough
+ * - reducer becomes unhealthy after some failures
+ * - no of attempts failing exceeds maxFailedUniqueFetches (5)
+ * Expected result
+ * - fail the reducer
+ */
+ public void testReducerHealth_1() throws IOException {
+ Configuration conf = new TezConfiguration();
+ _testReducerHealth_1(conf);
+ conf.setInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 4000);
+ _testReducerHealth_1(conf);
+ }
+
+ public void _testReducerHealth_1(Configuration conf) throws IOException {
+ long startTime = System.currentTimeMillis() - 500000;
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320,
+ shuffle, conf);
+
+ int totalProducerNodes = 20;
+
+ //Generate 320 events
+ for (int i = 0; i < 320; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+ 10000, i, "hostUrl", inputAttemptIdentifier);
+ }
+
+ //100 succeeds
+ for (int i = 0; i < 100; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ //99 fails
+ for (int i = 100; i < 199; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ }
+
+
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(200), 0, "attempt_");
+
+ //Should fail here and report exception as reducer is not healthy
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ int minFailurePerHost = conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT);
+
+ if (minFailurePerHost <= 4) {
+ //As per test threshold. Should fail & retrigger shuffle
+ verify(shuffle, atLeast(0)).reportException(any(Throwable.class));
+ } else if (minFailurePerHost > 100) {
+ //host failure is so high that this would not retrigger shuffle re-execution
+ verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+ }
+ }
+
+ @Test(timeout = 60000)
+ /**
+ * Scenario
+ * - reducer has progressed enough
+ * - failures start happening after that
+ * - no of attempts failing exceeds maxFailedUniqueFetches (5)
+ * - Has not stalled
+ * Expected result
+ * - Since reducer is not stalled, it should continue without error
+ *
+ * When reducer stalls, wait until enough retries are done and throw exception
+ *
+ */
+ public void testReducerHealth_2() throws IOException, InterruptedException {
+ long startTime = System.currentTimeMillis() - 500000;
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+ int totalProducerNodes = 20;
+
+ //Generate 0-200 events
+ for (int i = 0; i < 200; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+ 10000, i, "hostUrl", inputAttemptIdentifier);
+ }
+ assertEquals(320, scheduler.remainingMaps.get());
+
+ //Generate 200-320 events with empty partitions
+ for (int i = 200; i < 320; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copySucceeded(inputAttemptIdentifier, null, 0, 0, 0, null, true);
+ }
+ //120 are successful. so remaining is 200
+ assertEquals(200, scheduler.remainingMaps.get());
+
+
+ //200 pending to be downloaded. Download 190.
+ for (int i = 0; i < 190; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ assertEquals(10, scheduler.remainingMaps.get());
+
+ //10 fails
+ for (int i = 190; i < 200; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ }
+
+ //Shuffle has not stalled. so no issues.
+ verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
+
+ //stall shuffle
+ scheduler.lastProgressTime = System.currentTimeMillis() - 250000;
+
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(190), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" +
+ (190 % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures
+ verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
+
+ assertEquals(11, scheduler.failedShufflesSinceLastCompletion);
+
+ //fail to download 50 more times across attempts
+ for (int i = 190; i < 200; i++) {
+ inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ }
+
+ assertEquals(61, scheduler.failedShufflesSinceLastCompletion);
+ assertEquals(10, scheduler.remainingMaps.get());
+
+ verify(shuffle, atLeast(0)).reportException(any(Throwable.class));
+
+ //fail another 30
+ for (int i = 110; i < 120; i++) {
+ inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ }
+
+ // Should fail now due to fetcherHealthy. (stall has already happened and
+ // these are the only pending tasks)
+ verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+ }
+
+
+
+ @Test(timeout = 60000)
+ /**
+ * Scenario
+ * - reducer has progressed enough
+ * - failures start happening after that in last fetch
+ * - no of attempts failing does not exceed maxFailedUniqueFetches (5)
+ * - Stalled
+ * Expected result
+ * - Since reducer is stalled and if failures haven't happened across nodes,
+ * it should be fine to proceed. AM would restart source task eventually.
+ *
+ */
+ public void testReducerHealth_3() throws IOException {
+ long startTime = System.currentTimeMillis() - 500000;
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+ int totalProducerNodes = 20;
+
+ //Generate 320 events
+ for (int i = 0; i < 320; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+ 10000, i, "hostUrl", inputAttemptIdentifier);
+ }
+
+ //319 succeeds
+ for (int i = 0; i < 319; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ //1 fails (last fetch)
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ //stall the shuffle
+ scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
+
+ assertEquals(scheduler.remainingMaps.get(), 1);
+
+ //Retry for 3 more times
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ // failedShufflesSinceLastCompletion has crossed the limits. Throw error
+ verify(shuffle, times(0)).reportException(any(Throwable.class));
+ }
+
+ @Test(timeout = 60000)
+ /**
+ * Scenario
+ * - reducer has progressed enough
+ * - failures have happened randomly in nodes, but tasks are completed
+ * - failures start happening after that in last fetch
+ * - no of attempts failing does not exceed maxFailedUniqueFetches (5)
+ * - Stalled
+ * Expected result
+ * - reducer is stalled. But since errors are not seen across multiple
+ * nodes, it is left to the AM to retart producer. Do not kill consumer.
+ *
+ */
+ public void testReducerHealth_4() throws IOException {
+ long startTime = System.currentTimeMillis() - 500000;
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+ int totalProducerNodes = 20;
+
+ //Generate 320 events
+ for (int i = 0; i < 320; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+ 10000, i, "hostUrl", inputAttemptIdentifier);
+ }
+
+ //Tasks fail in 20% of nodes 3 times, but are able to proceed further
+ for (int i = 0; i < 64; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
+ totalProducerNodes) + ":" + 10000, ""), false, true, false);
+
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
+ totalProducerNodes) + ":" + 10000, ""), false, true, false);
+
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
+ totalProducerNodes) + ":" + 10000, ""), false, true, false);
+
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ //319 succeeds
+ for (int i = 64; i < 319; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ //1 fails (last fetch)
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ //stall the shuffle (but within limits)
+ scheduler.lastProgressTime = System.currentTimeMillis() - 100000;
+
+ assertEquals(scheduler.remainingMaps.get(), 1);
+
+ //Retry for 3 more times
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as
+ // well. However, it has failed only in one host. So this should proceed
+ // until AM decides to restart the producer.
+ verify(shuffle, times(0)).reportException(any(Throwable.class));
+
+ //stall the shuffle (but within limits)
+ scheduler.lastProgressTime = System.currentTimeMillis() - 300000;
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ verify(shuffle, times(1)).reportException(any(Throwable.class));
+
+ }
+
+ @Test(timeout = 60000)
+ /**
+ * Scenario
+ * - Shuffle has progressed enough
+ * - Last event is yet to arrive
+ * - Failures start happening after Shuffle has progressed enough
+ * - no of attempts failing does not exceed maxFailedUniqueFetches (5)
+ * - Stalled
+ * Expected result
+ * - Do not throw errors, as Shuffle is yet to receive inputs
+ *
+ */
+ public void testReducerHealth_5() throws IOException {
+ long startTime = System.currentTimeMillis() - 500000;
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+ int totalProducerNodes = 20;
+
+ //Generate 319 events (last event has not arrived)
+ for (int i = 0; i < 319; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+ 10000, i, "hostUrl", inputAttemptIdentifier);
+ }
+
+ //318 succeeds
+ for (int i = 0; i < 319; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ //1 fails (last fetch)
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(318), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ //stall the shuffle
+ scheduler.lastProgressTime = System.currentTimeMillis() - 1000000;
+
+ assertEquals(scheduler.remainingMaps.get(), 1);
+
+ //Retry for 3 more times
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 %
+ totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+
+ //Shuffle has not received the events completely. So do not bail out yet.
+ verify(shuffle, times(0)).reportException(any(Throwable.class));
+ }
+
+
+ @Test(timeout = 60000)
+ /**
+ * Scenario
+ * - Shuffle has NOT progressed enough
+ * - Failures start happening
+ * - no of attempts failing exceed maxFailedUniqueFetches (5)
+ * - Not stalled
+ * Expected result
+ * - Bail out
+ *
+ */
+ public void testReducerHealth_6() throws IOException {
+ Configuration conf = new TezConfiguration();
+ conf.setBoolean
+ (TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true);
+ _testReducerHealth_6(conf);
+
+ conf.setBoolean
+ (TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false);
+ _testReducerHealth_6(conf);
+
+ }
+
+ public void _testReducerHealth_6(Configuration conf) throws IOException {
+ long startTime = System.currentTimeMillis() - 500000;
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320,
+ shuffle, conf);
+
+ int totalProducerNodes = 20;
+
+ //Generate 320 events (last event has not arrived)
+ for (int i = 0; i < 320; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
+ 10000, i, "hostUrl", inputAttemptIdentifier);
+ }
+
+ //10 succeeds
+ for (int i = 0; i < 10; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ //5 fetches fail once
+ for (int i = 10; i < 15; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ }
+
+ assertTrue(scheduler.failureCounts.size() >= 5);
+ assertEquals(scheduler.remainingMaps.get(), 310);
+
+ //Do not bail out (number of failures is just 5)
+ verify(scheduler.reporter, times(0)).reportException(any(Throwable.class));
+
+ //5 fetches fail repeatedly
+ for (int i = 10; i < 15; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
+ + ":" + 10000, ""), false, true, false);
+ }
+
+ boolean checkFailedFetchSinceLastCompletion = conf.getBoolean
+ (TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION,
+ TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT);
+ if (checkFailedFetchSinceLastCompletion) {
+ // Now bail out, as Shuffle has crossed the
+ // failedShufflesSinceLastCompletion limits. (even
+ // though reducerHeathly is
+ verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+ } else {
+ //Do not bail out yet.
+ verify(shuffle, atLeast(0)).reportException(any(Throwable.class));
+ }
+
+ }
+
+ @Test(timeout = 60000)
+ /**
+ * Scenario
+ * - reducer has not progressed enough
+ * - fetch fails >
+ * TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION
+ * Expected result
+ * - fail the reducer
+ */
+ public void testReducerHealth_7() throws IOException {
+ long startTime = System.currentTimeMillis() - 500000;
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle);
+
+ int totalProducerNodes = 20;
+
+ //Generate 320 events
+ for (int i = 0; i < 320; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
+ "hostUrl", inputAttemptIdentifier);
+ }
+
+ //100 succeeds
+ for (int i = 0; i < 100; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ MapOutput mapOutput = MapOutput
+ .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
+ 100, false);
+ scheduler.copySucceeded(inputAttemptIdentifier,
+ new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ 100, 200, startTime + (i * 100), mapOutput, false);
+ }
+
+ //99 fails
+ for (int i = 100; i < 199; i++) {
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+ scheduler.copyFailed(inputAttemptIdentifier,
+ new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier,
+ new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier,
+ new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ false, true, false);
+ scheduler.copyFailed(inputAttemptIdentifier,
+ new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
+ false, true, false);
+ }
+
+ verify(shuffle, atLeast(1)).reportException(any(Throwable.class));
+ }
+
+ private ShuffleSchedulerForTest createScheduler(long startTime, int
+ numInputs, Shuffle shuffle, Configuration conf)
+ throws IOException {
+ InputContext inputContext = createTezInputContext();
+ MergeManager mergeManager = mock(MergeManager.class);
+
+ final ShuffleSchedulerForTest scheduler =
+ new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+ mergeManager,startTime, null, false, 0, "srcName");
+ return scheduler;
+ }
+
+ private ShuffleSchedulerForTest createScheduler(long startTime, int numInputs, Shuffle shuffle)
+ throws IOException {
+ return createScheduler(startTime, numInputs, shuffle, new
+ TezConfiguration());
+ }
+
+ @Test(timeout = 60000)
+ public void testPenalty() throws IOException, InterruptedException {
+ long startTime = System.currentTimeMillis();
+ Shuffle shuffle = mock(Shuffle.class);
+ final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle);
+
+ InputAttemptIdentifier inputAttemptIdentifier =
+ new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt_");
+ scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier);
+
+ assertTrue(scheduler.pendingHosts.size() == 1);
+ assertTrue(scheduler.pendingHosts.iterator().next().getState() == MapHost.State.PENDING);
+ MapHost mapHost = scheduler.pendingHosts.iterator().next();
+
+ //Fails to pull from host0. host0 should be added to penalties
+ scheduler.copyFailed(inputAttemptIdentifier, mapHost, false, true, false);
+
+ //Should not get host, as it is added to penalty loop
+ MapHost host = scheduler.getHost();
+ assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+
+ //Refree thread would release it after INITIAL_PENALTY timeout
+ Thread.sleep(ShuffleScheduler.INITIAL_PENALTY + 1000);
+ host = scheduler.getHost();
+ assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000"));
+ }
+
@Test(timeout = 5000)
public void testShutdown() throws Exception {
InputContext inputContext = createTezInputContext();
@@ -198,7 +815,7 @@ public class TestShuffleScheduler {
MapOutput mapOutput = MapOutput
.createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class),
100, false);
- scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput);
+ scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false);
scheduler.freeHost(mapHosts[i]);
}
@@ -234,6 +851,7 @@ public class TestShuffleScheduler {
private final AtomicInteger numFetchersCreated = new AtomicInteger(0);
private final boolean fetcherShouldWait;
+ private final ExceptionReporter reporter;
public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf,
int numberOfInputs,
@@ -258,6 +876,7 @@ public class TestShuffleScheduler {
super(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec,
ifileReadAhead, ifileReadAheadLength, srcNameTrimmed);
this.fetcherShouldWait = fetcherShouldWait;
+ this.reporter = shuffle;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
index 674d4b4..49b5490 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java
@@ -58,6 +58,23 @@ public class TestOrderedGroupedMergedKVInputConfig {
fromConf.set("test.conf.key.1", "confkey1");
fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
fromConf.set("io.shouldExist", "io");
+ fromConf.setInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 3);
+ fromConf.setFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION,
+ 0.1f);
+ fromConf.setFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION,
+ 0.6f);
+ fromConf.setInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT,
+ 10);
+ fromConf.setFloat(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.2f);
+ fromConf.setFloat(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f);
+ fromConf.setBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false);
Map<String, String> additionalConf = new HashMap<String, String>();
additionalConf.put("test.key.2", "key2");
additionalConf.put(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, "3");
@@ -96,6 +113,19 @@ public class TestOrderedGroupedMergedKVInputConfig {
assertEquals(0.22f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), 0.001f);
assertEquals(0.33f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.0f), 0.001f);
assertEquals(0.44f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.00f), 0.001f);
+ assertEquals(0.1f, conf.getFloat(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, 0.00f), 0.001f);
+ assertEquals(3, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 0), 0);
+ assertEquals(10, conf.getInt(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, 0), 0);
+ assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, 0.00f), 0.001f);
+ assertEquals(0.2f, conf.getFloat(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.00f), 0.001f);
+ assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f), 0.001f);
+ assertEquals(false, conf.getBoolean(TezRuntimeConfiguration
+ .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true));
// Verify additional configs
assertEquals(false, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,