You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/30 01:30:32 UTC

[16/50] [abbrv] hadoop git commit: MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov.

MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c31cf732
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c31cf732
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c31cf732

Branch: refs/heads/YARN-2928
Commit: c31cf7323d76b23a013168a02a0814e2c162d031
Parents: 1b43de4
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jun 24 15:29:11 2015 +0000
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jun 29 10:28:23 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt                 |  4 ++++
 .../apache/hadoop/mapreduce/task/reduce/Fetcher.java |  1 +
 .../mapreduce/task/reduce/IFileWrappedMapOutput.java | 10 ++--------
 .../hadoop/mapreduce/task/reduce/LocalFetcher.java   | 15 +++++----------
 4 files changed, 12 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31cf732/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index a93a69a..1b7381d 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -529,6 +529,10 @@ Release 2.8.0 - UNRELEASED
     multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
     AJISAKA via jlowe)
 
+    MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed
+    too early (Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov via
+    jlowe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31cf732/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 1e03387..fb0ac0a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -335,6 +335,7 @@ class Fetcher<K,V> extends Thread {
         try {
           failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
         } catch (IOException e) {
+          IOUtils.cleanup(LOG, input);
           //
           // Setup connection again if disconnected by NM
           connection.disconnect();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31cf732/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
index 119db15..6051c34 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
@@ -60,13 +60,7 @@ public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
                       long compressedLength, long decompressedLength,
                       ShuffleClientMetrics metrics,
                       Reporter reporter) throws IOException {
-    IFileInputStream iFin =
-        new IFileInputStream(input, compressedLength, conf);
-    try {
-      this.doShuffle(host, iFin, compressedLength,
-                    decompressedLength, metrics, reporter);
-    } finally {
-      iFin.close();
-    }
+    doShuffle(host, new IFileInputStream(input, compressedLength, conf),
+        compressedLength, decompressedLength, metrics, reporter);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31cf732/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index de2382c..f45742f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.IndexRecord;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
@@ -149,19 +150,13 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     // now read the file, seek to the appropriate section, and send it.
     FileSystem localFs = FileSystem.getLocal(job).getRaw();
     FSDataInputStream inStream = localFs.open(mapOutputFileName);
-
-    inStream = CryptoUtils.wrapIfNecessary(job, inStream);
-
     try {
+      inStream = CryptoUtils.wrapIfNecessary(job, inStream);
       inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
-      mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
+      mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
+          decompressedLength, metrics, reporter);
     } finally {
-      try {
-        inStream.close();
-      } catch (IOException ioe) {
-        LOG.warn("IOException closing inputstream from map output: "
-            + ioe.toString());
-      }
+      IOUtils.cleanup(LOG, inStream);
     }
 
     scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,