You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/09/26 19:33:46 UTC

hadoop git commit: MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne (cherry picked from commit bc1bd7e5c4047b374420683d36a8c30eda6d75b6) (cherry picked from c

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 1828ba00b -> 4104ab346


MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne
(cherry picked from commit bc1bd7e5c4047b374420683d36a8c30eda6d75b6)
(cherry picked from commit 30d0f10458fbe0e3a85ea2e22a1bb3d4454fd896)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4104ab34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4104ab34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4104ab34

Branch: refs/heads/branch-2.6
Commit: 4104ab346fdfe3d2296e41f9b7e105902ea1fc4d
Parents: 1828ba0
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Apr 28 20:17:52 2015 +0000
Committer: Sangjin Lee <sj...@apache.org>
Committed: Sat Sep 26 10:32:27 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../hadoop/mapreduce/task/reduce/Fetcher.java   |  6 ++--
 .../mapreduce/task/reduce/TestFetcher.java      | 34 ++++++++++++++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4104ab34/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 7270b8c..b2b74a9 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -16,6 +16,9 @@ Release 2.6.2 - UNRELEASED
     cache files so that child processes running hadoop scripts can access these
     files. (Junping Du via vinodkv)
 
+    MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
+    IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
+
 Release 2.6.1 - 2015-09-23
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4104ab34/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index d867e4b..4b80dc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -553,7 +553,10 @@ class Fetcher<K,V> extends Thread {
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
-      
+      if (mapOutput != null) {
+        mapOutput.abort();
+      }
+
       if (canRetry) {
         checkTimeoutOrRetry(host, ioe);
       } 
@@ -574,7 +577,6 @@ class Fetcher<K,V> extends Thread {
                " from " + host.getHostName(), ioe); 
 
       // Inform the shuffle-scheduler
-      mapOutput.abort();
       metrics.failedFetch();
       return new TaskAttemptID[] {mapId};
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4104ab34/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 723df17..a9cd33e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -628,6 +628,40 @@ public class TestFetcher {
     verify(odmo).abort();
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testCopyFromHostWithRetryUnreserve() throws Exception {
+    InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
+        id, ss, mm, r, metrics, except, key, connection);
+
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+        .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+
+    // Verify that unreserve occurs if an exception happens after shuffle
+    // buffer is reserved.
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+        .thenReturn(immo);
+    doThrow(new IOException("forced error")).when(immo).shuffle(
+        any(MapHost.class), any(InputStream.class), anyLong(),
+        anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+    underTest.copyFromHost(host);
+    verify(immo).abort();
+  }
+
   public static class FakeFetcher<K,V> extends Fetcher<K,V> {
 
     // If connection need to be reopen.