You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ge...@apache.org on 2014/12/16 04:20:38 UTC

hadoop git commit: MAPREDUCE-6166. Reducers do not validate checksum of map outputs when fetching directly to disk. (Eric Payne via gera)

Repository: hadoop
Updated Branches:
  refs/heads/trunk fb20797b6 -> af006937e


MAPREDUCE-6166. Reducers do not validate checksum of map outputs when fetching directly to disk. (Eric Payne via gera)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/af006937
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/af006937
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/af006937

Branch: refs/heads/trunk
Commit: af006937e8ba82f98f468dc7375fe89c2e0a7912
Parents: fb20797
Author: Gera Shegalov <ge...@apache.org>
Authored: Mon Dec 15 19:08:59 2014 -0800
Committer: Gera Shegalov <ge...@apache.org>
Committed: Mon Dec 15 19:08:59 2014 -0800

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../mapreduce/task/reduce/OnDiskMapOutput.java  |  9 ++-
 .../mapreduce/task/reduce/TestFetcher.java      | 72 +++++++++++++++++++-
 3 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/af006937/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 191526a..945d95c 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -279,6 +279,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-4879. TeraOutputFormat may overwrite an existing output
     directory. (gera)
 
+    MAPREDUCE-6166. Reducers do not validate checksum of map outputs when
+    fetching directly to disk. (Eric Payne via gera)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af006937/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index 6e0e92b..8275fd0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.io.IOUtils;
 
+import org.apache.hadoop.mapred.IFileInputStream;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapOutputFile;
@@ -52,6 +54,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
   private final MergeManagerImpl<K, V> merger;
   private final OutputStream disk; 
   private long compressedSize;
+  private final Configuration conf;
 
   public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K,V> merger, long size,
@@ -60,7 +63,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
                          int fetcher, boolean primaryMapOutput)
       throws IOException {
     this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher,
-        primaryMapOutput, FileSystem.getLocal(conf),
+        primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
         mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
   }
 
@@ -77,6 +80,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
     this.outputPath = outputPath;
     tmpOutputPath = getTempPath(outputPath, fetcher);
     disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
+    this.conf = conf;
   }
 
   @VisibleForTesting
@@ -89,13 +93,14 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
                       long compressedLength, long decompressedLength,
                       ShuffleClientMetrics metrics,
                       Reporter reporter) throws IOException {
+    input = new IFileInputStream(input, compressedLength, conf);
     // Copy data to local-disk
     long bytesLeft = compressedLength;
     try {
       final int BYTES_TO_READ = 64 * 1024;
       byte[] buf = new byte[BYTES_TO_READ];
       while (bytesLeft > 0) {
-        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
         if (n < 0) {
           throw new IOException("read past end of stream reading " + 
                                 getMapId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af006937/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 7736c48..929c0ae 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -24,6 +24,7 @@ import java.lang.Void;
 
 import java.net.HttpURLConnection;
 
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.MapOutputFile;
@@ -54,6 +55,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFileInputStream;
 import org.apache.hadoop.mapred.IFileOutputStream;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
@@ -88,6 +90,7 @@ public class TestFetcher {
   final MapHost host = new MapHost("localhost", "http://localhost:8080/");
   final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
   final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+  FileSystem fs = null;
 
   @Rule public TestName name = new TestName();
 
@@ -118,8 +121,11 @@ public class TestFetcher {
   }
 
   @After
-  public void teardown() {
+  public void teardown() throws IllegalArgumentException, IOException {
     LOG.info("<<<< " + name.getMethodName());
+    if (fs != null) {
+      fs.delete(new Path(name.getMethodName()),true);
+    }
   }
   
   @Test
@@ -432,6 +438,70 @@ public class TestFetcher {
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
   }
 
+  @Test
+  public void testCorruptedIFile() throws Exception {
+    final int fetcher = 7;
+    Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
+    Path shuffledToDisk =
+        OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
+    fs = FileSystem.getLocal(job).getRaw();
+    MapOutputFile mof = mock(MapOutputFile.class);
+    OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
+        id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
+
+    String mapData = "MAPDATA12345678901234567890";
+
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bout);
+    IFileOutputStream ios = new IFileOutputStream(dos);
+    header.write(dos);
+
+    int headerSize = dos.size();
+    try {
+      ios.write(mapData.getBytes());
+    } finally {
+      ios.close();
+    }
+
+    int dataSize = bout.size() - headerSize;
+
+    // Ensure that the OnDiskMapOutput shuffler can successfully read the data.
+    MapHost host = new MapHost("TestHost", "http://test/url");
+    ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+    try {
+      // Read past the shuffle header.
+      bin.read(new byte[headerSize], 0, headerSize);
+      odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
+    } finally {
+      bin.close();
+    }
+
+    // Now corrupt the IFile data.
+    byte[] corrupted = bout.toByteArray();
+    corrupted[headerSize + (dataSize / 2)] = 0x0;
+
+    try {
+      bin = new ByteArrayInputStream(corrupted);
+      // Read past the shuffle header.
+      bin.read(new byte[headerSize], 0, headerSize);
+      odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
+      fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
+    } catch(ChecksumException e) {
+      LOG.info("The expected checksum exception was thrown.", e);
+    } finally {
+      bin.close();
+    }
+
+    // Ensure that the shuffled file can be read.
+    IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
+    try {
+      iFin.read(new byte[dataSize], 0, dataSize);
+    } finally {
+      iFin.close();
+    }
+  }
+
   @Test(timeout=10000)
   public void testInterruptInMemory() throws Exception {
     final int FETCHER = 2;