You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/04/02 20:58:25 UTC
hadoop git commit: MAPREDUCE-6303. Read timeout when retrying a fetch
error can be fatal to a reducer. Contributed by Jason Lowe. (cherry picked
from commit eccb7d46efbf07abcc6a01bd5e7d682f6815b824)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 6e36dbf03 -> cacadea63
MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal to a reducer. Contributed by Jason Lowe.
(cherry picked from commit eccb7d46efbf07abcc6a01bd5e7d682f6815b824)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cacadea6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cacadea6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cacadea6
Branch: refs/heads/branch-2
Commit: cacadea632f7ab6fe4fdb1432e1a2c48e8ebd55f
Parents: 6e36dbf
Author: Junping Du <ju...@apache.org>
Authored: Thu Apr 2 12:13:03 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Apr 2 12:15:12 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../hadoop/mapreduce/task/reduce/Fetcher.java | 73 ++++++++++----------
.../mapreduce/task/reduce/TestFetcher.java | 33 +++++++++
3 files changed, 74 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cacadea6/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 34aa2ed..ad75410 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -265,6 +265,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6285. ClientServiceDelegate should not retry upon
AuthenticationException. (Jonathan Eagles via ozawa)
+ MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal
+ to a reducer. (Jason Lowe via junping_du)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cacadea6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 3f40853..d867e4b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -258,6 +258,39 @@ class Fetcher<K,V> extends Thread {
closeConnection();
}
+ private DataInputStream openShuffleUrl(MapHost host,
+ Set<TaskAttemptID> remaining, URL url) {
+ DataInputStream input = null;
+
+ try {
+ setupConnectionsWithRetry(host, remaining, url);
+ if (stopped) {
+ abortConnect(host, remaining);
+ } else {
+ input = new DataInputStream(connection.getInputStream());
+ }
+ } catch (IOException ie) {
+ boolean connectExcpt = ie instanceof ConnectException;
+ ioErrs.increment(1);
+ LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
+ " map outputs", ie);
+
+ // If connect did not succeed, just mark all the maps as failed,
+ // indirectly penalizing the host
+ scheduler.hostFailed(host.getHostName());
+ for(TaskAttemptID left: remaining) {
+ scheduler.copyFailed(left, host, false, connectExcpt);
+ }
+
+ // Add back all the remaining maps, WITHOUT marking them as failed
+ for(TaskAttemptID left: remaining) {
+ scheduler.putBackKnownMapOutput(host, left);
+ }
+ }
+
+ return input;
+ }
+
/**
* The crux of the matter...
*
@@ -286,38 +319,12 @@ class Fetcher<K,V> extends Thread {
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
// Construct the url and connect
- DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
- try {
- setupConnectionsWithRetry(host, remaining, url);
-
- if (stopped) {
- abortConnect(host, remaining);
- return;
- }
- } catch (IOException ie) {
- boolean connectExcpt = ie instanceof ConnectException;
- ioErrs.increment(1);
- LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
- " map outputs", ie);
-
- // If connect did not succeed, just mark all the maps as failed,
- // indirectly penalizing the host
- scheduler.hostFailed(host.getHostName());
- for(TaskAttemptID left: remaining) {
- scheduler.copyFailed(left, host, false, connectExcpt);
- }
-
- // Add back all the remaining maps, WITHOUT marking them as failed
- for(TaskAttemptID left: remaining) {
- scheduler.putBackKnownMapOutput(host, left);
- }
-
+ DataInputStream input = openShuffleUrl(host, remaining, url);
+ if (input == null) {
return;
}
- input = new DataInputStream(connection.getInputStream());
-
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
@@ -333,14 +340,10 @@ class Fetcher<K,V> extends Thread {
connection.disconnect();
// Get map output from remaining tasks only.
url = getMapOutputURL(host, remaining);
-
- // Connect with retry as expecting host's recovery take sometime.
- setupConnectionsWithRetry(host, remaining, url);
- if (stopped) {
- abortConnect(host, remaining);
+ input = openShuffleUrl(host, remaining, url);
+ if (input == null) {
return;
}
- input = new DataInputStream(connection.getInputStream());
}
}
@@ -591,7 +594,7 @@ class Fetcher<K,V> extends Thread {
// Retry is not timeout, let's do retry with throwing an exception.
if (currentTime - retryStartTime < this.fetchRetryTimeout) {
LOG.warn("Shuffle output from " + host.getHostName() +
- " failed, retry it.");
+ " failed, retry it.", ioe);
throw ioe;
} else {
// timeout, prepare to be failed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cacadea6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 929c0ae..723df17 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -388,6 +388,39 @@ public class TestFetcher {
anyBoolean(), anyBoolean());
}
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testCopyFromHostWithRetryThenTimeout() throws Exception {
+ InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
+ id, ss, mm, r, metrics, except, key, connection);
+
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+ when(connection.getResponseCode()).thenReturn(200)
+ .thenThrow(new SocketTimeoutException("forced timeout"));
+ when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+ .thenReturn(replyHash);
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ header.write(new DataOutputStream(bout));
+ ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+ when(connection.getInputStream()).thenReturn(in);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+ .thenReturn(immo);
+ doThrow(new IOException("forced error")).when(immo).shuffle(
+ any(MapHost.class), any(InputStream.class), anyLong(),
+ anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+ underTest.copyFromHost(host);
+ verify(allErrs).increment(1);
+ verify(ss).copyFailed(map1ID, host, false, false);
+ }
+
@Test
public void testCopyFromHostExtraBytes() throws Exception {
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,