You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/02/03 23:38:31 UTC
tez git commit: TEZ-3608. Fetcher can hang if
copyMapOutput/fetchInputs returns early (Kuhu Shukla via jeagles)
Repository: tez
Updated Branches:
refs/heads/TEZ-3334 de2e0f268 -> c08eddf01
TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c08eddf0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c08eddf0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c08eddf0
Branch: refs/heads/TEZ-3334
Commit: c08eddf01d99e7e245d68fa8a4f6ab191308b4a7
Parents: de2e0f2
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Feb 3 17:38:17 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Feb 3 17:38:17 2017 -0600
----------------------------------------------------------------------
TEZ-3334-CHANGES.txt | 1 +
.../tez/runtime/library/common/shuffle/Fetcher.java | 8 +++-----
.../shuffle/orderedgrouped/FetcherOrderedGrouped.java | 12 +++++-------
.../common/shuffle/orderedgrouped/TestFetcher.java | 2 +-
4 files changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 1ba8ca6..025f53d 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
INCOMPATIBLE CHANGES:
ALL CHANGES:
+ TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early
TEZ-3606. Fix debug log for empty partitions to the expanded partitionId in the Composite case
TEZ-3604. Remove the compositeInputAttemptIdentifier from remaining list upon fetch completion in the Ordered case
TEZ-3599. Unordered Fetcher can hang if empty partitions are present
http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index a083daa..3afbb23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -596,10 +596,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
false);
}
try {
- failedInputs = fetchInputs(input, callback);
- if(failedInputs == null || failedInputs.length == 0) {
- srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
- }
+ failedInputs = fetchInputs(input, callback, inputAttemptIdentifier);
} catch (FetcherReadTimeoutException e) {
//clean up connection
shutdownInternal(true);
@@ -833,7 +830,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
}
}
private InputAttemptIdentifier[] fetchInputs(DataInputStream input,
- CachingCallBack callback) throws FetcherReadTimeoutException {
+ CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
FetchedInput fetchedInput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = 0;
@@ -972,6 +969,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// Note successful shuffle
// metrics.successFetch();
}
+ srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
} catch (IOException ioe) {
if (isShutDown.get()) {
cleanupFetchedInput(fetchedInput);
http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 6bdb453..1bfd2a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -278,16 +278,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
- String inputAttemptIdentifierId =
- remaining.entrySet().iterator().next().getKey();
+ InputAttemptIdentifier inputAttemptIdentifier =
+ remaining.entrySet().iterator().next().getValue();
// fail immediately after first failure because we dont know how much to
// skip for this error in the input stream. So we cannot move on to the
// remaining outputs. YARN-1773. Will get to them in the next retry.
try {
- failedTasks = copyMapOutput(host, input);
- if (failedTasks == null || failedTasks.length == 0) {
- remaining.remove(inputAttemptIdentifierId);
- }
+ failedTasks = copyMapOutput(host, input, inputAttemptIdentifier);
} catch (FetcherReadTimeoutException e) {
// Setup connection again if disconnected
cleanupCurrentConnection(true);
@@ -431,7 +428,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
}
protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
- DataInputStream input) throws FetcherReadTimeoutException {
+ DataInputStream input, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
MapOutput mapOutput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = 0;
@@ -575,6 +572,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
// Note successful shuffle
metrics.successFetch();
}
+ remaining.remove(inputAttemptIdentifier.toString());
} catch(IOException ioe) {
if (stopped) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 54b0279..dfa473b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -561,7 +561,7 @@ public class TestFetcher {
// Throw IOException when fetcher tries to connect again to the same node
throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception");
}
- }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class));
+ }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class), any(InputAttemptIdentifier.class));
try {
fetcher.copyFromHost(host);