You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/08 19:19:15 UTC
[17/50] hadoop git commit: MAPREDUCE-6174. Combine common stream code
into parent class for InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via
gera)
MAPREDUCE-6174. Combine common stream code into parent class for InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2e585863
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2e585863
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2e585863
Branch: refs/heads/YARN-2928
Commit: 2e58586316f548a8dd2effbc15d0729d1a622fe3
Parents: 2bff83c
Author: Gera Shegalov <ge...@apache.org>
Authored: Wed Jun 3 16:26:45 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jun 8 09:43:15 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../task/reduce/IFileWrappedMapOutput.java | 72 ++++++++++++++++++++
.../task/reduce/InMemoryMapOutput.java | 26 ++-----
.../mapreduce/task/reduce/MergeManagerImpl.java | 5 +-
.../mapreduce/task/reduce/OnDiskMapOutput.java | 33 +++++----
.../mapreduce/task/reduce/TestFetcher.java | 27 ++++----
6 files changed, 114 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e585863/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ba94324..5cc08a3 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -358,6 +358,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-5248. Let NNBenchWithoutMR specify the replication factor for
its test (Erik Paulson via jlowe)
+ MAPREDUCE-6174. Combine common stream code into parent class for
+ InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e585863/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
new file mode 100644
index 0000000..119db15
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.IFileInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * Common code for allowing MapOutput classes to handle streams.
+ *
+ * @param <K> key type for map output
+ * @param <V> value type for map output
+ */
+public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
+ private final Configuration conf;
+ private final MergeManagerImpl<K, V> merger;
+
+ public IFileWrappedMapOutput(
+ Configuration c, MergeManagerImpl<K, V> m, TaskAttemptID mapId,
+ long size, boolean primaryMapOutput) {
+ super(mapId, size, primaryMapOutput);
+ conf = c;
+ merger = m;
+ }
+
+ /**
+ * @return the merger
+ */
+ protected MergeManagerImpl<K, V> getMerger() {
+ return merger;
+ }
+
+ protected abstract void doShuffle(
+ MapHost host, IFileInputStream iFileInputStream,
+ long compressedLength, long decompressedLength,
+ ShuffleClientMetrics metrics, Reporter reporter) throws IOException;
+
+ @Override
+ public void shuffle(MapHost host, InputStream input,
+ long compressedLength, long decompressedLength,
+ ShuffleClientMetrics metrics,
+ Reporter reporter) throws IOException {
+ IFileInputStream iFin =
+ new IFileInputStream(input, compressedLength, conf);
+ try {
+ this.doShuffle(host, iFin, compressedLength,
+ decompressedLength, metrics, reporter);
+ } finally {
+ iFin.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e585863/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
index 24fb3bb..9b61ad5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
@@ -42,10 +42,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
@InterfaceAudience.Private
@InterfaceStability.Unstable
-class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
+class InMemoryMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class);
- private Configuration conf;
- private final MergeManagerImpl<K, V> merger;
private final byte[] memory;
private BoundedByteArrayOutputStream byteStream;
// Decompression of map-outputs
@@ -56,9 +54,7 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
- super(mapId, (long)size, primaryMapOutput);
- this.conf = conf;
- this.merger = merger;
+ super(conf, merger, mapId, (long)size, primaryMapOutput);
this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
@@ -78,15 +74,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
}
@Override
- public void shuffle(MapHost host, InputStream input,
+ protected void doShuffle(MapHost host, IFileInputStream iFin,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
- IFileInputStream checksumIn =
- new IFileInputStream(input, compressedLength, conf);
+ InputStream input = iFin;
- input = checksumIn;
-
// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
@@ -111,13 +104,6 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
throw new IOException("Unexpected extra bytes from input stream for " +
getMapId());
}
-
- } catch (IOException ioe) {
- // Close the streams
- IOUtils.cleanup(LOG, input);
-
- // Re-throw
- throw ioe;
} finally {
CodecPool.returnDecompressor(decompressor);
}
@@ -125,12 +111,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
@Override
public void commit() throws IOException {
- merger.closeInMemoryFile(this);
+ getMerger().closeInMemoryFile(this);
}
@Override
public void abort() {
- merger.unreserve(memory.length);
+ getMerger().unreserve(memory.length);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e585863/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index f788707..c99a330 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -263,8 +263,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
- return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
- jobConf, mapOutputFile, fetcher, true);
+ return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf,
+ fetcher, true, FileSystem.getLocal(jobConf).getRaw(),
+ mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize));
}
// Stall shuffle if we are above the memory limit
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e585863/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index 8275fd0..f22169d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -18,13 +18,11 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,41 +44,46 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@InterfaceStability.Unstable
-class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
+class OnDiskMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class);
private final FileSystem fs;
private final Path tmpOutputPath;
private final Path outputPath;
- private final MergeManagerImpl<K, V> merger;
private final OutputStream disk;
private long compressedSize;
- private final Configuration conf;
+ @Deprecated
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size,
JobConf conf,
MapOutputFile mapOutputFile,
int fetcher, boolean primaryMapOutput)
throws IOException {
- this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher,
+ this(mapId, merger, size, conf, fetcher,
primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
}
- @VisibleForTesting
+ @Deprecated
OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size,
JobConf conf,
MapOutputFile mapOutputFile,
int fetcher, boolean primaryMapOutput,
FileSystem fs, Path outputPath) throws IOException {
- super(mapId, size, primaryMapOutput);
+ this(mapId, merger, size, conf, fetcher, primaryMapOutput, fs, outputPath);
+ }
+
+ OnDiskMapOutput(TaskAttemptID mapId,
+ MergeManagerImpl<K, V> merger, long size,
+ JobConf conf,
+ int fetcher, boolean primaryMapOutput,
+ FileSystem fs, Path outputPath) throws IOException {
+ super(conf, merger, mapId, size, primaryMapOutput);
this.fs = fs;
- this.merger = merger;
this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher);
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
- this.conf = conf;
}
@VisibleForTesting
@@ -89,18 +92,18 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
}
@Override
- public void shuffle(MapHost host, InputStream input,
+ protected void doShuffle(MapHost host, IFileInputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
- input = new IFileInputStream(input, compressedLength, conf);
// Copy data to local-disk
long bytesLeft = compressedLength;
try {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
- int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ int n = input.readWithChecksum(buf, 0,
+ (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
getMapId());
@@ -117,7 +120,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
disk.close();
} catch (IOException ioe) {
// Close the streams
- IOUtils.cleanup(LOG, input, disk);
+ IOUtils.cleanup(LOG, disk);
// Re-throw
throw ioe;
@@ -139,7 +142,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
fs.rename(tmpOutputPath, outputPath);
CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
getSize(), this.compressedSize);
- merger.closeOnDiskFile(compressAwarePath);
+ getMerger().closeOnDiskFile(compressAwarePath);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e585863/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index a9cd33e..7888007 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -19,9 +19,7 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.FilterInputStream;
-
import java.lang.Void;
-
import java.net.HttpURLConnection;
import org.apache.hadoop.fs.ChecksumException;
@@ -30,13 +28,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
-import static org.junit.Assert.*;
+import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
@@ -65,10 +62,11 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.nimbusds.jose.util.StringUtils;
+
/**
* Test that the Fetcher does what we expect it to.
*/
@@ -453,9 +451,9 @@ public class TestFetcher {
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
// 8 < 10 therefore there appear to be extra bytes in the IFileInputStream
- InMemoryMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>(
+ IFileWrappedMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>(
job, map1ID, mm, 8, null, true );
- InMemoryMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>(
+ IFileWrappedMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>(
job, map2ID, mm, 10, null, true );
when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut);
@@ -478,9 +476,9 @@ public class TestFetcher {
Path shuffledToDisk =
OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
fs = FileSystem.getLocal(job).getRaw();
- MapOutputFile mof = mock(MapOutputFile.class);
- OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
- id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
+ IFileWrappedMapOutput<Text,Text> odmo =
+ new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true,
+ fs, onDiskMapOutputPath);
String mapData = "MAPDATA12345678901234567890";
@@ -538,7 +536,7 @@ public class TestFetcher {
@Test(timeout=10000)
public void testInterruptInMemory() throws Exception {
final int FETCHER = 2;
- InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
+ IFileWrappedMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
job, id, mm, 100, null, true));
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(immo);
@@ -584,10 +582,9 @@ public class TestFetcher {
Path p = new Path("file:///tmp/foo");
Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER);
FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS);
- MapOutputFile mof = mock(MapOutputFile.class);
- when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p);
- OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID,
- id, mm, 100L, job, mof, FETCHER, true, mFs, p));
+ IFileWrappedMapOutput<Text,Text> odmo =
+ spy(new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job,
+ FETCHER, true, mFs, p));
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(odmo);
doNothing().when(mm).waitForResource();