You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/06/24 19:50:55 UTC

[17/21] hadoop git commit: MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov.

MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72d08a0e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72d08a0e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72d08a0e

Branch: refs/heads/HDFS-7240
Commit: 72d08a0e41efda635baa985d55d67cb059a7c07c
Parents: 2ba6465
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jun 24 15:29:11 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jun 24 15:29:11 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt                 |  4 ++++
 .../apache/hadoop/mapreduce/task/reduce/Fetcher.java |  1 +
 .../mapreduce/task/reduce/IFileWrappedMapOutput.java | 10 ++--------
 .../hadoop/mapreduce/task/reduce/LocalFetcher.java   | 15 +++++----------
 4 files changed, 12 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5eae44e..6c65032 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -505,6 +505,10 @@ Release 2.8.0 - UNRELEASED
     multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
     AJISAKA via jlowe)
 
+    MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed
+    too early (Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov via
+    jlowe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 1e03387..fb0ac0a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -335,6 +335,7 @@ class Fetcher<K,V> extends Thread {
         try {
           failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
         } catch (IOException e) {
+          IOUtils.cleanup(LOG, input);
           //
           // Setup connection again if disconnected by NM
           connection.disconnect();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
index 119db15..6051c34 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
@@ -60,13 +60,7 @@ public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
                       long compressedLength, long decompressedLength,
                       ShuffleClientMetrics metrics,
                       Reporter reporter) throws IOException {
-    IFileInputStream iFin =
-        new IFileInputStream(input, compressedLength, conf);
-    try {
-      this.doShuffle(host, iFin, compressedLength,
-                    decompressedLength, metrics, reporter);
-    } finally {
-      iFin.close();
-    }
+    doShuffle(host, new IFileInputStream(input, compressedLength, conf),
+        compressedLength, decompressedLength, metrics, reporter);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index de2382c..f45742f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.IndexRecord;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
@@ -149,19 +150,13 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     // now read the file, seek to the appropriate section, and send it.
     FileSystem localFs = FileSystem.getLocal(job).getRaw();
     FSDataInputStream inStream = localFs.open(mapOutputFileName);
-
-    inStream = CryptoUtils.wrapIfNecessary(job, inStream);
-
     try {
+      inStream = CryptoUtils.wrapIfNecessary(job, inStream);
       inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
-      mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
+      mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
+          decompressedLength, metrics, reporter);
     } finally {
-      try {
-        inStream.close();
-      } catch (IOException ioe) {
-        LOG.warn("IOException closing inputstream from map output: "
-            + ioe.toString());
-      }
+      IOUtils.cleanup(LOG, inStream);
     }
 
     scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,