You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ge...@apache.org on 2014/12/16 04:20:38 UTC
hadoop git commit: MAPREDUCE-6166. Reducers do not validate checksum
of map outputs when fetching directly to disk. (Eric Payne via gera)
Repository: hadoop
Updated Branches:
refs/heads/trunk fb20797b6 -> af006937e
MAPREDUCE-6166. Reducers do not validate checksum of map outputs when fetching directly to disk. (Eric Payne via gera)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/af006937
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/af006937
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/af006937
Branch: refs/heads/trunk
Commit: af006937e8ba82f98f468dc7375fe89c2e0a7912
Parents: fb20797
Author: Gera Shegalov <ge...@apache.org>
Authored: Mon Dec 15 19:08:59 2014 -0800
Committer: Gera Shegalov <ge...@apache.org>
Committed: Mon Dec 15 19:08:59 2014 -0800
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../mapreduce/task/reduce/OnDiskMapOutput.java | 9 ++-
.../mapreduce/task/reduce/TestFetcher.java | 72 +++++++++++++++++++-
3 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af006937/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 191526a..945d95c 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -279,6 +279,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output
directory. (gera)
+ MAPREDUCE-6166. Reducers do not validate checksum of map outputs when
+ fetching directly to disk. (Eric Payne via gera)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af006937/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index 6e0e92b..8275fd0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.MapOutputFile;
@@ -52,6 +54,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
private final MergeManagerImpl<K, V> merger;
private final OutputStream disk;
private long compressedSize;
+ private final Configuration conf;
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size,
@@ -60,7 +63,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
int fetcher, boolean primaryMapOutput)
throws IOException {
this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher,
- primaryMapOutput, FileSystem.getLocal(conf),
+ primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
}
@@ -77,6 +80,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher);
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
+ this.conf = conf;
}
@VisibleForTesting
@@ -89,13 +93,14 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
+ input = new IFileInputStream(input, compressedLength, conf);
// Copy data to local-disk
long bytesLeft = compressedLength;
try {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
- int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
getMapId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/af006937/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 7736c48..929c0ae 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -24,6 +24,7 @@ import java.lang.Void;
import java.net.HttpURLConnection;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MapOutputFile;
@@ -54,6 +55,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.IFileOutputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
@@ -88,6 +90,7 @@ public class TestFetcher {
final MapHost host = new MapHost("localhost", "http://localhost:8080/");
final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+ FileSystem fs = null;
@Rule public TestName name = new TestName();
@@ -118,8 +121,11 @@ public class TestFetcher {
}
@After
- public void teardown() {
+ public void teardown() throws IllegalArgumentException, IOException {
LOG.info("<<<< " + name.getMethodName());
+ if (fs != null) {
+ fs.delete(new Path(name.getMethodName()),true);
+ }
}
@Test
@@ -432,6 +438,70 @@ public class TestFetcher {
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
}
+ @Test
+ public void testCorruptedIFile() throws Exception {
+ final int fetcher = 7;
+ Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
+ Path shuffledToDisk =
+ OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
+ fs = FileSystem.getLocal(job).getRaw();
+ MapOutputFile mof = mock(MapOutputFile.class);
+ OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
+ id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
+
+ String mapData = "MAPDATA12345678901234567890";
+
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bout);
+ IFileOutputStream ios = new IFileOutputStream(dos);
+ header.write(dos);
+
+ int headerSize = dos.size();
+ try {
+ ios.write(mapData.getBytes());
+ } finally {
+ ios.close();
+ }
+
+ int dataSize = bout.size() - headerSize;
+
+ // Ensure that the OnDiskMapOutput shuffler can successfully read the data.
+ MapHost host = new MapHost("TestHost", "http://test/url");
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+ try {
+ // Read past the shuffle header.
+ bin.read(new byte[headerSize], 0, headerSize);
+ odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
+ } finally {
+ bin.close();
+ }
+
+ // Now corrupt the IFile data.
+ byte[] corrupted = bout.toByteArray();
+ corrupted[headerSize + (dataSize / 2)] = 0x0;
+
+ try {
+ bin = new ByteArrayInputStream(corrupted);
+ // Read past the shuffle header.
+ bin.read(new byte[headerSize], 0, headerSize);
+ odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
+ fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
+ } catch(ChecksumException e) {
+ LOG.info("The expected checksum exception was thrown.", e);
+ } finally {
+ bin.close();
+ }
+
+ // Ensure that the shuffled file can be read.
+ IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
+ try {
+ iFin.read(new byte[dataSize], 0, dataSize);
+ } finally {
+ iFin.close();
+ }
+ }
+
@Test(timeout=10000)
public void testInterruptInMemory() throws Exception {
final int FETCHER = 2;