You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/05/20 17:53:33 UTC

tez git commit: TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer (jlowe) (cherry picked from commit 5dd47c6858028ac0187c4fab76b66217bf0d6c56)

Repository: tez
Updated Branches:
  refs/heads/branch-0.8 86fdc0a86 -> fa65f3599


TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer (jlowe)
(cherry picked from commit 5dd47c6858028ac0187c4fab76b66217bf0d6c56)

Conflicts:

	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fa65f359
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fa65f359
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fa65f359

Branch: refs/heads/branch-0.8
Commit: fa65f3599e82c901ee43cca1a01df0b875ce0ef1
Parents: 86fdc0a
Author: Jason Lowe <jl...@apache.org>
Authored: Fri May 20 17:53:10 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri May 20 17:53:10 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../library/api/TezRuntimeConfiguration.java    | 12 +++++
 .../runtime/library/common/shuffle/Fetcher.java | 17 ++++---
 .../library/common/shuffle/ShuffleUtils.java    | 29 +++++++-----
 .../common/shuffle/impl/ShuffleManager.java     |  6 ++-
 .../orderedgrouped/FetcherOrderedGrouped.java   |  9 +++-
 .../orderedgrouped/ShuffleScheduler.java        |  8 +++-
 .../runtime/library/common/sort/impl/IFile.java | 49 ++++++++++++++++++--
 .../common/sort/impl/IFileInputStream.java      |  7 ++-
 .../library/input/OrderedGroupedKVInput.java    |  1 +
 .../runtime/library/input/UnorderedKVInput.java |  1 +
 .../library/common/shuffle/TestFetcher.java     | 15 +++---
 .../common/shuffle/TestShuffleUtils.java        | 27 +++++++++++
 .../shuffle/orderedgrouped/TestFetcher.java     | 27 +++++++----
 .../library/common/sort/impl/TestIFile.java     | 40 ++++++++++++++++
 15 files changed, 207 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 68cf9f6..794d06e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer
   TEZ-3246. Improve diagnostics when DAG killed by user
   TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options.
   TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency
@@ -450,6 +451,7 @@ Release 0.7.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer
   TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options.
   TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency
   TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index caad6ef..08f76f2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -373,6 +373,17 @@ public class TezRuntimeConfiguration {
       "shuffle.ssl.enable";
   public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT = false;
 
+  /**
+   * Controls verification of data checksums when fetching data directly to
+   * disk. Enabling verification allows the fetcher to detect corrupted data
+   * and report the failure against the upstream task before the data reaches
+   * the Processor and causes the fetching task to fail.
+   */
+  @ConfigurationProperty(type = "boolean")
+  public static final String TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM =
+      TEZ_RUNTIME_PREFIX + "shuffle.fetch.verify-disk-checksum";
+  public static final boolean TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT = true;
+
   @ConfigurationProperty(type = "float")
   public static final String TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX +
       "shuffle.fetch.buffer.percent";
@@ -541,6 +552,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 261f2e7..a0705fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -128,6 +128,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
   private final boolean asyncHttp;
 
+  private final boolean verifyDiskChecksum;
+
   private final boolean isDebugEnabled = LOG.isDebugEnabled();
 
   private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
@@ -139,8 +141,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       boolean localDiskFetchEnabled,
       boolean sharedFetchEnabled,
       String localHostname,
-      int shufflePort, boolean asyncHttp) {
+      int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
     this.asyncHttp = asyncHttp;
+    this.verifyDiskChecksum = verifyDiskChecksum;
     this.fetcherCallback = fetcherCallback;
     this.inputManager = inputManager;
     this.jobTokenSecretMgr = jobTokenSecretManager;
@@ -804,7 +807,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       } else if (fetchedInput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
           (host +":" +port), input, compressedLength, decompressedLength, LOG,
-          fetchedInput.getInputAttemptIdentifier().toString());
+          fetchedInput.getInputAttemptIdentifier().toString(),
+          ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
       } else {
         throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
             fetchedInput);
@@ -971,10 +975,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         HttpConnectionParams params, FetchedInputAllocator inputManager,
         ApplicationId appId, int dagIdentifier,  JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
         Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
-        boolean asyncHttp) {
+        boolean asyncHttp, boolean verifyDiskChecksum) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
-          false, localHostname, shufflePort, asyncHttp);
+          false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum);
     }
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -983,10 +987,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         Configuration conf, RawLocalFileSystem localFs,
         LocalDirAllocator localDirAllocator, Path lockPath,
         boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
-        String localHostname, int shufflePort, boolean asyncHttp) {
+        String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
-          lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp);
+          lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
+          verifyDiskChecksum);
     }
 
     public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 685503c..ae646ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -140,22 +140,27 @@ public class ShuffleUtils {
   }
   
   public static void shuffleToDisk(OutputStream output, String hostIdentifier,
-      InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier)
-      throws IOException {
+      InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier,
+      boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException {
     // Copy data to local-disk
     long bytesLeft = compressedLength;
     try {
-      final int BYTES_TO_READ = 64 * 1024;
-      byte[] buf = new byte[BYTES_TO_READ];
-      while (bytesLeft > 0) {
-        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
-        if (n < 0) {
-          throw new IOException("read past end of stream reading "
-              + identifier);
+      if (verifyChecksum) {
+        bytesLeft -= IFile.Reader.readToDisk(output, input, compressedLength,
+            ifileReadAhead, ifileReadAheadLength);
+      } else {
+        final int BYTES_TO_READ = 64 * 1024;
+        byte[] buf = new byte[BYTES_TO_READ];
+        while (bytesLeft > 0) {
+          int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+          if (n < 0) {
+            throw new IOException("read past end of stream reading "
+                + identifier);
+          }
+          output.write(buf, 0, n);
+          bytesLeft -= n;
+          // metrics.inputBytes(n);
         }
-        output.write(buf, 0, n);
-        bytesLeft -= n;
-        // metrics.inputBytes(n);
       }
 
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b82098e..4f7d348 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -136,6 +136,7 @@ public class ShuffleManager implements FetcherCallback {
   private final CompressionCodec codec;
   private final boolean localDiskFetchEnabled;
   private final boolean sharedFetchEnabled;
+  private final boolean verifyDiskChecksum;
   
   private final int ifileBufferSize;
   private final boolean ifileReadAhead;
@@ -198,6 +199,9 @@ public class ShuffleManager implements FetcherCallback {
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
     this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
+    this.verifyDiskChecksum = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
 
     this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
@@ -404,7 +408,7 @@ public class ShuffleManager implements FetcherCallback {
       httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
         jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
         lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
-        localhostName, shufflePort, asyncHttp);
+        localhostName, shufflePort, asyncHttp, verifyDiskChecksum);
 
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 51bdf68..bcb75d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -59,6 +59,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
   private final Configuration conf;
   private final boolean localDiskFetchEnabled;
+  private final boolean verifyDiskChecksum;
 
   private final TezCounter connectionErrs;
   private final TezCounter ioErrs;
@@ -125,7 +126,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                                String applicationId,
                                int dagId,
                                boolean asyncHttp,
-                               boolean sslShuffle) {
+                               boolean sslShuffle,
+                               boolean verifyDiskChecksum) {
     this.scheduler = scheduler;
     this.allocator = allocator;
     this.metrics = metrics;
@@ -159,6 +161,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
     this.localDiskFetchEnabled = localDiskFetchEnabled;
     this.sslShuffle = sslShuffle;
+    this.verifyDiskChecksum = verifyDiskChecksum;
 
     this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id;
   }
@@ -504,7 +507,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
           ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
       } else if (mapOutput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
-          input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
+          input, compressedLength, decompressedLength, LOG,
+          mapOutput.getAttemptIdentifier().toString(),
+          ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
       } else {
         throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
             mapOutput.getType());

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 2f6e490..c017efb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -235,6 +235,7 @@ class ShuffleScheduler {
   private final float minReqProgressFraction;
   private final float maxAllowedFailedFetchFraction;
   private final boolean checkFailedFetchSinceLastCompletion;
+  private final boolean verifyDiskChecksum;
 
   private volatile Thread shuffleSchedulerThread = null;
 
@@ -388,6 +389,10 @@ class ShuffleScheduler {
         conf.getBoolean(
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
+    this.verifyDiskChecksum = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
+
     /**
      * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
      * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
@@ -1347,7 +1352,8 @@ class ShuffleScheduler {
         shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
         codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
         ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
-        connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle);
+        connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle,
+        verifyDiskChecksum);
   }
 
   private class FetchFutureCallback implements FutureCallback<Void> {

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 6d8992e..a20182c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -645,6 +646,43 @@ public class IFile {
       }
     }
 
+    /**
+     * Read entire IFile content to disk.
+     *
+     * @param out the output stream that will receive the data
+     * @param in the input stream containing the IFile data
+     * @param length the amount of data to read from the input
+     * @return the number of bytes copied
+     * @throws IOException
+     */
+    public static long readToDisk(OutputStream out, InputStream in, long length,
+        boolean ifileReadAhead, int ifileReadAheadLength)
+        throws IOException {
+      final int BYTES_TO_READ = 64 * 1024;
+      byte[] buf = new byte[BYTES_TO_READ];
+
+      // copy the IFile header
+      if (length < HEADER.length) {
+        throw new IOException("Missing IFile header");
+      }
+      IOUtils.readFully(in, buf, 0, HEADER.length);
+      verifyHeaderMagic(buf);
+      out.write(buf, 0, HEADER.length);
+      long bytesLeft = length - HEADER.length;
+      @SuppressWarnings("resource")
+      IFileInputStream ifInput = new IFileInputStream(in, bytesLeft,
+          ifileReadAhead, ifileReadAheadLength);
+      while (bytesLeft > 0) {
+        int n = ifInput.readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        if (n < 0) {
+          throw new IOException("read past end of stream");
+        }
+        out.write(buf, 0, n);
+        bytesLeft -= n;
+      }
+      return length - bytesLeft;
+    }
+
     public long getLength() {
       return fileLength - checksumIn.getSize();
     }
@@ -784,14 +822,17 @@ public class IFile {
       ++numRecordsRead;
     }
 
-    public static boolean isCompressedFlagEnabled(InputStream in) throws IOException {
-      byte[] header = new byte[HEADER.length];
-      IOUtils.readFully(in, header, 0, HEADER.length);
-
+    private static void verifyHeaderMagic(byte[] header) throws IOException {
       if (!(header[0] == 'T' && header[1] == 'I'
           && header[2] == 'F')) {
         throw new IOException("Not a valid ifile header");
       }
+    }
+
+    public static boolean isCompressedFlagEnabled(InputStream in) throws IOException {
+      byte[] header = new byte[HEADER.length];
+      IOUtils.readFully(in, header, 0, HEADER.length);
+      verifyHeaderMagic(header);
       return (header[3] == 1);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index d116242..c5853d4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -125,7 +125,7 @@ public class IFileInputStream extends InputStream {
     if (curReadahead != null) {
       curReadahead.cancel();
     }
-    if (currentOffset < dataLength) {
+    if (currentOffset < dataLength && !disableChecksumValidation) {
       byte[] t = new byte[Math.min((int)
             (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
       while (currentOffset < dataLength) {
@@ -300,7 +300,10 @@ public class IFileInputStream extends InputStream {
     return result;
   }
 
-  void disableChecksumValidation() {
+  /**
+   * Disable checksum validation when reading the stream
+   */
+  public void disableChecksumValidation() {
     disableChecksumValidation = true;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 5e367cf..9a2a23e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -354,6 +354,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index dbbe23ff..ec9a191 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -263,6 +263,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 0aa112e..bd0ea0f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -71,7 +71,7 @@ public class TestFetcher {
 
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
         ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
-        PORT, false);
+        PORT, false, true);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 
@@ -89,7 +89,7 @@ public class TestFetcher {
     // when enabled and hostname does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
         ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
-        PORT, false);
+        PORT, false, true);
     builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
@@ -104,7 +104,8 @@ public class TestFetcher {
 
     // when enabled and port does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false);
+        ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+        PORT, false, true);
     builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
@@ -121,7 +122,7 @@ public class TestFetcher {
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
         ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
-        PORT, false);
+        PORT, false, true);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 
@@ -154,7 +155,8 @@ public class TestFetcher {
     int partition = 42;
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
-        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false);
+        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
+        false, true);
     builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 
@@ -272,7 +274,8 @@ public class TestFetcher {
     int partition = 42;
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
-        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false);
+        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
+        false, true);
     builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
     fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 4ac1bca..c542030 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -23,6 +24,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -33,9 +35,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Random;
@@ -283,4 +288,26 @@ public class TestShuffleUtils {
       Assert.assertTrue(e.getMessage().contains(codecErrorMsg));
     }
   }
+
+  @Test
+  public void testShuffleToDiskChecksum() throws Exception {
+    // verify sending a stream of zeroes without checksum validation
+    // does not trigger an exception
+    byte[] bogusData = new byte[1000];
+    Arrays.fill(bogusData, (byte) 0);
+    ByteArrayInputStream in = new ByteArrayInputStream(bogusData);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ShuffleUtils.shuffleToDisk(baos, "somehost", in,
+        bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, false);
+    Assert.assertArrayEquals(bogusData, baos.toByteArray());
+
+    // verify sending same stream of zeroes with validation generates an exception
+    in.reset();
+    try {
+      ShuffleUtils.shuffleToDisk(mock(OutputStream.class), "somehost", in,
+          bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, true);
+      Assert.fail("shuffle was supposed to throw!");
+    } catch (IOException e) {
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 89d35f4..310f1b2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -127,7 +127,8 @@ public class TestFetcher {
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+            false, false, true);
 
     fetcher.call();
     verify(scheduler).getMapsForHost(mapHost);
@@ -155,7 +156,8 @@ public class TestFetcher {
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+            false, false, true);
 
     // when local mode is enabled and host and port matches use local fetch
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -172,7 +174,8 @@ public class TestFetcher {
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+            false, false ,true);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -187,7 +190,8 @@ public class TestFetcher {
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+            false, false, true);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -201,7 +205,8 @@ public class TestFetcher {
     fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
         wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+        false, false, true);
     spyFetcher = spy(fetcher);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
 
@@ -225,7 +230,8 @@ public class TestFetcher {
     MapHost host = new MapHost(HOST, PORT, 1);
     FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+        false, false, true);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
 
@@ -369,7 +375,8 @@ public class TestFetcher {
     final MapHost host = new MapHost(HOST, PORT, 1);
     FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
         null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
-        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+        false, false, true);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
 
 
@@ -460,7 +467,8 @@ public class TestFetcher {
             false, 0,
             null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, true, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+            true, false, true);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
     fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
     final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -527,7 +535,8 @@ public class TestFetcher {
         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
-            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+            wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+            false, false, true);
     fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
     Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
     Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();

http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 1c269bd..24acc40 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -19,8 +19,10 @@
 package org.apache.tez.runtime.library.common.sort.impl;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -28,6 +30,7 @@ import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -375,6 +378,43 @@ public class TestIFile {
     readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
   }
 
+  @Test(timeout = 20000)
+  public void testReadToDisk() throws IOException {
+    // verify sending a stream of zeroes generates an error
+    byte[] zeroData = new byte[1000];
+    Arrays.fill(zeroData, (byte) 0);
+    ByteArrayInputStream in = new ByteArrayInputStream(zeroData);
+    try {
+      IFile.Reader.readToDisk(new ByteArrayOutputStream(), in, zeroData.length, false, 0);
+      fail("Exception should have been thrown");
+    } catch (IOException e) {
+    }
+
+    // verify sending same stream of zeroes with a valid IFile header still
+    // generates an error
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    baos.write(IFile.HEADER);
+    baos.write(zeroData);
+    try {
+      IFile.Reader.readToDisk(new ByteArrayOutputStream(),
+          new ByteArrayInputStream(baos.toByteArray()), zeroData.length, false, 0);
+      fail("Exception should have been thrown");
+    } catch (IOException e) {
+      assertTrue(e instanceof ChecksumException);
+    }
+
+    // verify valid data is copied properly
+    List<KVPair> data = KVDataGen.generateTestData(true, 0);
+    Writer writer = writeTestFile(false, false, data, codec);
+    baos.reset();
+    IFile.Reader.readToDisk(baos, localFs.open(outputPath), writer.getCompressedLength(),
+        false, 0);
+    byte[] diskData = baos.toByteArray();
+    Reader reader = new Reader(new ByteArrayInputStream(diskData), diskData.length,
+        codec, null, null, false, 0, 1024);
+    verifyData(reader, data);
+    reader.close();
+  }
 
   /**
    * Test different options (RLE, repeat keys, compression) on reader/writer