You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/06/24 17:29:53 UTC
hadoop git commit: MAPREDUCE-6400. Multiple shuffle transfer fails
because input is closed too early. Contributed by Brahma Reddy Battula,
Akira AJISAKA, and Gera Shegalov.
Repository: hadoop
Updated Branches:
refs/heads/trunk 2ba646572 -> 72d08a0e4
MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72d08a0e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72d08a0e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72d08a0e
Branch: refs/heads/trunk
Commit: 72d08a0e41efda635baa985d55d67cb059a7c07c
Parents: 2ba6465
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jun 24 15:29:11 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jun 24 15:29:11 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 4 ++++
.../apache/hadoop/mapreduce/task/reduce/Fetcher.java | 1 +
.../mapreduce/task/reduce/IFileWrappedMapOutput.java | 10 ++--------
.../hadoop/mapreduce/task/reduce/LocalFetcher.java | 15 +++++----------
4 files changed, 12 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5eae44e..6c65032 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -505,6 +505,10 @@ Release 2.8.0 - UNRELEASED
multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
AJISAKA via jlowe)
+ MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed
+ too early (Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov via
+ jlowe)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 1e03387..fb0ac0a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -335,6 +335,7 @@ class Fetcher<K,V> extends Thread {
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
} catch (IOException e) {
+ IOUtils.cleanup(LOG, input);
//
// Setup connection again if disconnected by NM
connection.disconnect();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
index 119db15..6051c34 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
@@ -60,13 +60,7 @@ public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
- IFileInputStream iFin =
- new IFileInputStream(input, compressedLength, conf);
- try {
- this.doShuffle(host, iFin, compressedLength,
- decompressedLength, metrics, reporter);
- } finally {
- iFin.close();
- }
+ doShuffle(host, new IFileInputStream(input, compressedLength, conf),
+ compressedLength, decompressedLength, metrics, reporter);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index de2382c..f45742f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
@@ -149,19 +150,13 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
// now read the file, seek to the appropriate section, and send it.
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
-
- inStream = CryptoUtils.wrapIfNecessary(job, inStream);
-
try {
+ inStream = CryptoUtils.wrapIfNecessary(job, inStream);
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
- mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
+ mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
+ decompressedLength, metrics, reporter);
} finally {
- try {
- inStream.close();
- } catch (IOException ioe) {
- LOG.warn("IOException closing inputstream from map output: "
- + ioe.toString());
- }
+ IOUtils.cleanup(LOG, inStream);
}
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,