You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:01:46 UTC

[29/50] [abbrv] hadoop git commit: MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne

MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc1bd7e5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc1bd7e5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc1bd7e5

Branch: refs/heads/HDFS-7240
Commit: bc1bd7e5c4047b374420683d36a8c30eda6d75b6
Parents: 5639bf0
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Apr 28 20:17:52 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Apr 28 20:19:05 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../hadoop/mapreduce/task/reduce/Fetcher.java   |  6 ++--
 .../mapreduce/task/reduce/TestFetcher.java      | 34 ++++++++++++++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index d27a022..2090007 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -376,6 +376,9 @@ Release 2.7.1 - UNRELEASED
     MAPREDUCE-6252. JobHistoryServer should not fail when encountering a 
     missing directory. (Craig Welch via devaraj)
 
+    MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
+    IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index d867e4b..4b80dc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -553,7 +553,10 @@ class Fetcher<K,V> extends Thread {
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
-      
+      if (mapOutput != null) {
+        mapOutput.abort();
+      }
+
       if (canRetry) {
         checkTimeoutOrRetry(host, ioe);
       } 
@@ -574,7 +577,6 @@ class Fetcher<K,V> extends Thread {
                " from " + host.getHostName(), ioe); 
 
       // Inform the shuffle-scheduler
-      mapOutput.abort();
       metrics.failedFetch();
       return new TaskAttemptID[] {mapId};
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 723df17..a9cd33e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -628,6 +628,40 @@ public class TestFetcher {
     verify(odmo).abort();
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testCopyFromHostWithRetryUnreserve() throws Exception {
+    InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
+        id, ss, mm, r, metrics, except, key, connection);
+
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+        .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+
+    // Verify that unreserve occurs if an exception happens after shuffle
+    // buffer is reserved.
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+        .thenReturn(immo);
+    doThrow(new IOException("forced error")).when(immo).shuffle(
+        any(MapHost.class), any(InputStream.class), anyLong(),
+        anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+    underTest.copyFromHost(host);
+    verify(immo).abort();
+  }
+
   public static class FakeFetcher<K,V> extends Fetcher<K,V> {
 
     // If connection need to be reopen.