You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/02/03 23:38:31 UTC

tez git commit: TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early (Kuhu Shukla via jeagles)

Repository: tez
Updated Branches:
  refs/heads/TEZ-3334 de2e0f268 -> c08eddf01


TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c08eddf0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c08eddf0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c08eddf0

Branch: refs/heads/TEZ-3334
Commit: c08eddf01d99e7e245d68fa8a4f6ab191308b4a7
Parents: de2e0f2
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Feb 3 17:38:17 2017 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Feb 3 17:38:17 2017 -0600

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                                    |  1 +
 .../tez/runtime/library/common/shuffle/Fetcher.java     |  8 +++-----
 .../shuffle/orderedgrouped/FetcherOrderedGrouped.java   | 12 +++++-------
 .../common/shuffle/orderedgrouped/TestFetcher.java      |  2 +-
 4 files changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 1ba8ca6..025f53d 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early
   TEZ-3606. Fix debug log for empty partitions to the expanded partitionId in the Composite case
   TEZ-3604. Remove the compositeInputAttemptIdentifier from remaining list upon fetch completion in the Ordered case
   TEZ-3599. Unordered Fetcher can hang if empty partitions are present

http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index a083daa..3afbb23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -596,10 +596,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
             false);
       }
       try {
-        failedInputs = fetchInputs(input, callback);
-        if(failedInputs == null || failedInputs.length == 0) {
-         srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
-        }
+        failedInputs = fetchInputs(input, callback, inputAttemptIdentifier);
       } catch (FetcherReadTimeoutException e) {
         //clean up connection
         shutdownInternal(true);
@@ -833,7 +830,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     }
   }
   private InputAttemptIdentifier[] fetchInputs(DataInputStream input,
-      CachingCallBack callback) throws FetcherReadTimeoutException {
+      CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
     FetchedInput fetchedInput = null;
     InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = 0;
@@ -972,6 +969,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         // Note successful shuffle
         // metrics.successFetch();
       }
+      srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
     } catch (IOException ioe) {
       if (isShutDown.get()) {
         cleanupFetchedInput(fetchedInput);

http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 6bdb453..1bfd2a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -278,16 +278,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       // yet_to_be_fetched list and marking the failed tasks.
       InputAttemptIdentifier[] failedTasks = null;
       while (!remaining.isEmpty() && failedTasks == null) {
-        String inputAttemptIdentifierId =
-            remaining.entrySet().iterator().next().getKey();
+        InputAttemptIdentifier inputAttemptIdentifier =
+            remaining.entrySet().iterator().next().getValue();
         // fail immediately after first failure because we dont know how much to
         // skip for this error in the input stream. So we cannot move on to the 
         // remaining outputs. YARN-1773. Will get to them in the next retry.
         try {
-          failedTasks = copyMapOutput(host, input);
-          if (failedTasks == null || failedTasks.length == 0) {
-            remaining.remove(inputAttemptIdentifierId);
-          }
+          failedTasks = copyMapOutput(host, input, inputAttemptIdentifier);
         } catch (FetcherReadTimeoutException e) {
           // Setup connection again if disconnected
           cleanupCurrentConnection(true);
@@ -431,7 +428,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   }
 
   protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
-                                DataInputStream input) throws FetcherReadTimeoutException {
+                                DataInputStream input, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
     MapOutput mapOutput = null;
     InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = 0;
@@ -575,6 +572,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
         // Note successful shuffle
         metrics.successFetch();
       }
+      remaining.remove(inputAttemptIdentifier.toString());
     } catch(IOException ioe) {
       if (stopped) {
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/c08eddf0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 54b0279..dfa473b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -561,7 +561,7 @@ public class TestFetcher {
         // Throw IOException when fetcher tries to connect again to the same node
         throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception");
       }
-    }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class));
+    }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class), any(InputAttemptIdentifier.class));
 
     try {
       fetcher.copyFromHost(host);