You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/09/26 19:33:46 UTC
hadoop git commit: MAPREDUCE-6334. Fetcher#copyMapOutput is leaking
usedMemory upon IOException during InMemoryMapOutput shuffle handler.
Contributed by Eric Payne (cherry picked from commit
bc1bd7e5c4047b374420683d36a8c30eda6d75b6) (cherry picked from c
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6 1828ba00b -> 4104ab346
MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne
(cherry picked from commit bc1bd7e5c4047b374420683d36a8c30eda6d75b6)
(cherry picked from commit 30d0f10458fbe0e3a85ea2e22a1bb3d4454fd896)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4104ab34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4104ab34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4104ab34
Branch: refs/heads/branch-2.6
Commit: 4104ab346fdfe3d2296e41f9b7e105902ea1fc4d
Parents: 1828ba0
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Apr 28 20:17:52 2015 +0000
Committer: Sangjin Lee <sj...@apache.org>
Committed: Sat Sep 26 10:32:27 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 ++
.../hadoop/mapreduce/task/reduce/Fetcher.java | 6 ++--
.../mapreduce/task/reduce/TestFetcher.java | 34 ++++++++++++++++++++
3 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4104ab34/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 7270b8c..b2b74a9 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -16,6 +16,9 @@ Release 2.6.2 - UNRELEASED
cache files so that child processes running hadoop scripts can access these
files. (Junping Du via vinodkv)
+ MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
+ IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
+
Release 2.6.1 - 2015-09-23
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4104ab34/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index d867e4b..4b80dc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -553,7 +553,10 @@ class Fetcher<K,V> extends Thread {
metrics.successFetch();
return null;
} catch (IOException ioe) {
-
+ if (mapOutput != null) {
+ mapOutput.abort();
+ }
+
if (canRetry) {
checkTimeoutOrRetry(host, ioe);
}
@@ -574,7 +577,6 @@ class Fetcher<K,V> extends Thread {
" from " + host.getHostName(), ioe);
// Inform the shuffle-scheduler
- mapOutput.abort();
metrics.failedFetch();
return new TaskAttemptID[] {mapId};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4104ab34/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 723df17..a9cd33e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -628,6 +628,40 @@ public class TestFetcher {
verify(odmo).abort();
}
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testCopyFromHostWithRetryUnreserve() throws Exception {
+ InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
+ id, ss, mm, r, metrics, except, key, connection);
+
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+ when(connection.getResponseCode()).thenReturn(200);
+ when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+ .thenReturn(replyHash);
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ header.write(new DataOutputStream(bout));
+ ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+ when(connection.getInputStream()).thenReturn(in);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+
+ // Verify that unreserve occurs if an exception happens after shuffle
+ // buffer is reserved.
+ when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+ .thenReturn(immo);
+ doThrow(new IOException("forced error")).when(immo).shuffle(
+ any(MapHost.class), any(InputStream.class), anyLong(),
+ anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+ underTest.copyFromHost(host);
+ verify(immo).abort();
+ }
+
public static class FakeFetcher<K,V> extends Fetcher<K,V> {
// If connection need to be reopen.