You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:01:46 UTC
[29/50] [abbrv] hadoop git commit: MAPREDUCE-6334.
Fetcher#copyMapOutput is leaking usedMemory upon IOException during
InMemoryMapOutput shuffle handler. Contributed by Eric Payne
MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc1bd7e5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc1bd7e5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc1bd7e5
Branch: refs/heads/HDFS-7240
Commit: bc1bd7e5c4047b374420683d36a8c30eda6d75b6
Parents: 5639bf0
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Apr 28 20:17:52 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Apr 28 20:19:05 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 ++
.../hadoop/mapreduce/task/reduce/Fetcher.java | 6 ++--
.../mapreduce/task/reduce/TestFetcher.java | 34 ++++++++++++++++++++
3 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index d27a022..2090007 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -376,6 +376,9 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6252. JobHistoryServer should not fail when encountering a
missing directory. (Craig Welch via devaraj)
+ MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
+ IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
+
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index d867e4b..4b80dc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -553,7 +553,10 @@ class Fetcher<K,V> extends Thread {
metrics.successFetch();
return null;
} catch (IOException ioe) {
-
+ if (mapOutput != null) {
+ mapOutput.abort();
+ }
+
if (canRetry) {
checkTimeoutOrRetry(host, ioe);
}
@@ -574,7 +577,6 @@ class Fetcher<K,V> extends Thread {
" from " + host.getHostName(), ioe);
// Inform the shuffle-scheduler
- mapOutput.abort();
metrics.failedFetch();
return new TaskAttemptID[] {mapId};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc1bd7e5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 723df17..a9cd33e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -628,6 +628,40 @@ public class TestFetcher {
verify(odmo).abort();
}
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testCopyFromHostWithRetryUnreserve() throws Exception {
+ InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
+ id, ss, mm, r, metrics, except, key, connection);
+
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+ when(connection.getResponseCode()).thenReturn(200);
+ when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+ .thenReturn(replyHash);
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ header.write(new DataOutputStream(bout));
+ ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+ when(connection.getInputStream()).thenReturn(in);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+
+ // Verify that unreserve occurs if an exception happens after shuffle
+ // buffer is reserved.
+ when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+ .thenReturn(immo);
+ doThrow(new IOException("forced error")).when(immo).shuffle(
+ any(MapHost.class), any(InputStream.class), anyLong(),
+ anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+ underTest.copyFromHost(host);
+ verify(immo).abort();
+ }
+
public static class FakeFetcher<K,V> extends Fetcher<K,V> {
// If connection need to be reopen.