You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ge...@apache.org on 2015/06/04 01:55:09 UTC

hadoop git commit: MAPREDUCE-6174. Combine common stream code into parent class for InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)

Repository: hadoop
Updated Branches:
  refs/heads/trunk bc85959ed -> d90c13e2d


MAPREDUCE-6174. Combine common stream code into parent class for InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d90c13e2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d90c13e2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d90c13e2

Branch: refs/heads/trunk
Commit: d90c13e2da8867661bf19a802add70145ab9a462
Parents: bc85959
Author: Gera Shegalov <ge...@apache.org>
Authored: Wed Jun 3 16:26:45 2015 -0700
Committer: Gera Shegalov <ge...@apache.org>
Committed: Wed Jun 3 16:50:26 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../task/reduce/IFileWrappedMapOutput.java      | 72 ++++++++++++++++++++
 .../task/reduce/InMemoryMapOutput.java          | 26 ++-----
 .../mapreduce/task/reduce/MergeManagerImpl.java |  5 +-
 .../mapreduce/task/reduce/OnDiskMapOutput.java  | 33 +++++----
 .../mapreduce/task/reduce/TestFetcher.java      | 27 ++++----
 6 files changed, 114 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d90c13e2/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index f75b1aa..dac3b5b 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -334,6 +334,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-5248. Let NNBenchWithoutMR specify the replication factor for
     its test (Erik Paulson via jlowe)
 
+    MAPREDUCE-6174. Combine common stream code into parent class for
+    InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d90c13e2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
new file mode 100644
index 0000000..119db15
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.IFileInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * Common code for allowing MapOutput classes to handle streams.
+ *
+ * @param <K> key type for map output
+ * @param <V> value type for map output
+ */
+public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
+  private final Configuration conf;
+  private final MergeManagerImpl<K, V> merger;
+
+  public IFileWrappedMapOutput(
+      Configuration c, MergeManagerImpl<K, V> m, TaskAttemptID mapId,
+      long size, boolean primaryMapOutput) {
+    super(mapId, size, primaryMapOutput);
+    conf = c;
+    merger = m;
+  }
+
+  /**
+   * @return the merger
+   */
+  protected MergeManagerImpl<K, V> getMerger() {
+    return merger;
+  }
+
+  protected abstract void doShuffle(
+      MapHost host, IFileInputStream iFileInputStream,
+      long compressedLength, long decompressedLength,
+      ShuffleClientMetrics metrics, Reporter reporter) throws IOException;
+
+  @Override
+  public void shuffle(MapHost host, InputStream input,
+                      long compressedLength, long decompressedLength,
+                      ShuffleClientMetrics metrics,
+                      Reporter reporter) throws IOException {
+    IFileInputStream iFin =
+        new IFileInputStream(input, compressedLength, conf);
+    try {
+      this.doShuffle(host, iFin, compressedLength,
+                    decompressedLength, metrics, reporter);
+    } finally {
+      iFin.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d90c13e2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
index 24fb3bb..9b61ad5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
@@ -42,10 +42,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
+class InMemoryMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
   private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class);
-  private Configuration conf;
-  private final MergeManagerImpl<K, V> merger;
   private final byte[] memory;
   private BoundedByteArrayOutputStream byteStream;
   // Decompression of map-outputs
@@ -56,9 +54,7 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
                            MergeManagerImpl<K, V> merger,
                            int size, CompressionCodec codec,
                            boolean primaryMapOutput) {
-    super(mapId, (long)size, primaryMapOutput);
-    this.conf = conf;
-    this.merger = merger;
+    super(conf, merger, mapId, (long)size, primaryMapOutput);
     this.codec = codec;
     byteStream = new BoundedByteArrayOutputStream(size);
     memory = byteStream.getBuffer();
@@ -78,15 +74,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
   }
 
   @Override
-  public void shuffle(MapHost host, InputStream input,
+  protected void doShuffle(MapHost host, IFileInputStream iFin,
                       long compressedLength, long decompressedLength,
                       ShuffleClientMetrics metrics,
                       Reporter reporter) throws IOException {
-    IFileInputStream checksumIn = 
-      new IFileInputStream(input, compressedLength, conf);
+    InputStream input = iFin;
 
-    input = checksumIn;       
-  
     // Are map-outputs compressed?
     if (codec != null) {
       decompressor.reset();
@@ -111,13 +104,6 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
         throw new IOException("Unexpected extra bytes from input stream for " +
                                getMapId());
       }
-
-    } catch (IOException ioe) {      
-      // Close the streams
-      IOUtils.cleanup(LOG, input);
-
-      // Re-throw
-      throw ioe;
     } finally {
       CodecPool.returnDecompressor(decompressor);
     }
@@ -125,12 +111,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
 
   @Override
   public void commit() throws IOException {
-    merger.closeInMemoryFile(this);
+    getMerger().closeInMemoryFile(this);
   }
   
   @Override
   public void abort() {
-    merger.unreserve(memory.length);
+    getMerger().unreserve(memory.length);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d90c13e2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index f788707..c99a330 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -263,8 +263,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
       LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
                " is greater than maxSingleShuffleLimit (" + 
                maxSingleShuffleLimit + ")");
-      return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
-                                      jobConf, mapOutputFile, fetcher, true);
+      return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf,
+         fetcher, true, FileSystem.getLocal(jobConf).getRaw(),
+         mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize));
     }
     
     // Stall shuffle if we are above the memory limit

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d90c13e2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index 8275fd0..f22169d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -18,13 +18,11 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -46,41 +44,46 @@ import com.google.common.annotations.VisibleForTesting;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
+class OnDiskMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
   private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class);
   private final FileSystem fs;
   private final Path tmpOutputPath;
   private final Path outputPath;
-  private final MergeManagerImpl<K, V> merger;
   private final OutputStream disk; 
   private long compressedSize;
-  private final Configuration conf;
 
+  @Deprecated
   public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K,V> merger, long size,
                          JobConf conf,
                          MapOutputFile mapOutputFile,
                          int fetcher, boolean primaryMapOutput)
       throws IOException {
-    this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher,
+    this(mapId, merger, size, conf, fetcher,
         primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
         mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
   }
 
-  @VisibleForTesting
+  @Deprecated
   OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K,V> merger, long size,
                          JobConf conf,
                          MapOutputFile mapOutputFile,
                          int fetcher, boolean primaryMapOutput,
                          FileSystem fs, Path outputPath) throws IOException {
-    super(mapId, size, primaryMapOutput);
+    this(mapId, merger, size, conf, fetcher, primaryMapOutput, fs, outputPath);
+  }
+
+  OnDiskMapOutput(TaskAttemptID mapId,
+                  MergeManagerImpl<K, V> merger, long size,
+                  JobConf conf,
+                  int fetcher, boolean primaryMapOutput,
+                  FileSystem fs, Path outputPath) throws IOException {
+    super(conf, merger, mapId, size, primaryMapOutput);
     this.fs = fs;
-    this.merger = merger;
     this.outputPath = outputPath;
     tmpOutputPath = getTempPath(outputPath, fetcher);
     disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
-    this.conf = conf;
   }
 
   @VisibleForTesting
@@ -89,18 +92,18 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
   }
 
   @Override
-  public void shuffle(MapHost host, InputStream input,
+  protected void doShuffle(MapHost host, IFileInputStream input,
                       long compressedLength, long decompressedLength,
                       ShuffleClientMetrics metrics,
                       Reporter reporter) throws IOException {
-    input = new IFileInputStream(input, compressedLength, conf);
     // Copy data to local-disk
     long bytesLeft = compressedLength;
     try {
       final int BYTES_TO_READ = 64 * 1024;
       byte[] buf = new byte[BYTES_TO_READ];
       while (bytesLeft > 0) {
-        int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        int n = input.readWithChecksum(buf, 0,
+                                      (int) Math.min(bytesLeft, BYTES_TO_READ));
         if (n < 0) {
           throw new IOException("read past end of stream reading " + 
                                 getMapId());
@@ -117,7 +120,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
       disk.close();
     } catch (IOException ioe) {
       // Close the streams
-      IOUtils.cleanup(LOG, input, disk);
+      IOUtils.cleanup(LOG, disk);
 
       // Re-throw
       throw ioe;
@@ -139,7 +142,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
     fs.rename(tmpOutputPath, outputPath);
     CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
         getSize(), this.compressedSize);
-    merger.closeOnDiskFile(compressAwarePath);
+    getMerger().closeOnDiskFile(compressAwarePath);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d90c13e2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index a9cd33e..7888007 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -19,9 +19,7 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.FilterInputStream;
-
 import java.lang.Void;
-
 import java.net.HttpURLConnection;
 
 import org.apache.hadoop.fs.ChecksumException;
@@ -30,13 +28,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskID;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
-import static org.junit.Assert.*;
 
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
@@ -65,10 +62,11 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.nimbusds.jose.util.StringUtils;
+
 /**
  * Test that the Fetcher does what we expect it to.
  */
@@ -453,9 +451,9 @@ public class TestFetcher {
     ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
     when(connection.getInputStream()).thenReturn(in);
     // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream
-    InMemoryMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>(
+    IFileWrappedMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>(
         job, map1ID, mm, 8, null, true );
-    InMemoryMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>(
+    IFileWrappedMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>(
         job, map2ID, mm, 10, null, true );
 
     when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut);
@@ -478,9 +476,9 @@ public class TestFetcher {
     Path shuffledToDisk =
         OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
     fs = FileSystem.getLocal(job).getRaw();
-    MapOutputFile mof = mock(MapOutputFile.class);
-    OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
-        id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
+    IFileWrappedMapOutput<Text,Text> odmo =
+        new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true,
+                                       fs, onDiskMapOutputPath);
 
     String mapData = "MAPDATA12345678901234567890";
 
@@ -538,7 +536,7 @@ public class TestFetcher {
   @Test(timeout=10000)
   public void testInterruptInMemory() throws Exception {
     final int FETCHER = 2;
-    InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
+    IFileWrappedMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
           job, id, mm, 100, null, true));
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
         .thenReturn(immo);
@@ -584,10 +582,9 @@ public class TestFetcher {
     Path p = new Path("file:///tmp/foo");
     Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER);
     FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS);
-    MapOutputFile mof = mock(MapOutputFile.class);
-    when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p);
-    OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID,
-        id, mm, 100L, job, mof, FETCHER, true, mFs, p));
+    IFileWrappedMapOutput<Text,Text> odmo =
+        spy(new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job,
+                                           FETCHER, true, mFs, p));
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
         .thenReturn(odmo);
     doNothing().when(mm).waitForResource();