You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/06 07:29:52 UTC

git commit: TEZ-597. Broadcast Input should use the compressed size when creating an On-Disk IFile reader. (sseth)

Updated Branches:
  refs/heads/master 6fddbd01b -> 98ca0091d


TEZ-597. Broadcast Input should use the compressed size when creating an
On-Disk IFile reader. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/98ca0091
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/98ca0091
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/98ca0091

Branch: refs/heads/master
Commit: 98ca0091d9c119ea8a3f2cf21ab4db0de2a77c9b
Parents: 6fddbd0
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Nov 5 22:29:35 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Nov 5 22:29:35 2013 -0800

----------------------------------------------------------------------
 .../tez/mapreduce/hadoop/MRJobConfig.java       | 26 ------------
 .../broadcast/input/BroadcastInputManager.java  | 19 +++++----
 .../broadcast/input/BroadcastKVReader.java      |  4 +-
 .../BroadcastShuffleInputEventHandler.java      |  2 +-
 .../input/BroadcastShuffleManager.java          | 12 +++++-
 .../task/local/output/TezTaskOutputFiles.java   |  1 -
 .../library/output/OnFileUnorderedKVOutput.java |  2 +-
 .../shuffle/common/DiskFetchedInput.java        | 29 +++----------
 .../library/shuffle/common/FetchedInput.java    | 16 +++++---
 .../shuffle/common/FetchedInputAllocator.java   |  2 +-
 .../runtime/library/shuffle/common/Fetcher.java | 43 ++++++++++++++------
 .../shuffle/common/MemoryFetchedInput.java      | 11 ++---
 .../input/TestBroadcastInputManager.java        | 11 ++---
 13 files changed, 85 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 940b1e0..0974080 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -652,30 +652,4 @@ public interface MRJobConfig {
   public static final String MR_TEZ_SPLITS_VIA_EVENTS = MR_TEZ_PREFIX + "splits.via.events";
   public static final boolean MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT = true;
 
-  // Stage specific properties
-  // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>
-  // where suffix is one of MRR_INTERMEDIATE_STAGE_* fields defined below.
-//  public static final String MRR_INTERMEDIATE_STAGE_TASKS = "tasks";
-//  public static final String MRR_INTERMEDIATE_STAGE_CLASS = "class";
-//  public static final String
-//      MRR_INTERMEDIATE_STAGE_PARTITIONER_CLASS = "partitioner.class";
-//  public static final String
-//      MRR_INTERMEDIATE_STAGE_COMBINER_CLASS = "combiner.class";
-//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESS =
-//      "output.compress";
-//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESSION_CODEC =
-//      "output.compression.codec";
-//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_CLASS =
-//      "key.class";
-//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_COMPARATOR_CLASS
-//    = "key.comparator.class";
-//  public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_VALUE_CLASS =
-//      "value.class";
-//  public static final String MRR_INTERMEDIATE_STAGE_SPECULATE =
-//      "speculate";
-//  public static final String MRR_INTERMEDIATE_STAGE_MEMORY_MB =
-//      "memory.mb";
-//  public static final String MRR_INTERMEDIATE_STAGE_CHILD_JAVA_OPTS =
-//      "child.java.opts";
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index 512db50..456b8b6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -89,16 +89,17 @@ public class BroadcastInputManager implements FetchedInputAllocator,
   }
 
   @Override
-  public synchronized FetchedInput allocate(long size,
+  public synchronized FetchedInput allocate(long actualSize, long compressedSize,
       InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
-    if (size > maxSingleShuffleLimit
-        || this.usedMemory + size > this.memoryLimit) {
-      return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
-          localDirAllocator, fileNameAllocator);
+    if (actualSize > maxSingleShuffleLimit
+        || this.usedMemory + actualSize > this.memoryLimit) {
+      return new DiskFetchedInput(actualSize, compressedSize,
+          inputAttemptIdentifier, this, conf, localDirAllocator,
+          fileNameAllocator);
     } else {
-      this.usedMemory += size;
-      LOG.info("Used memory after allocating " + size  + " : " + usedMemory);
-      return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
+      this.usedMemory += actualSize;
+      LOG.info("Used memory after allocating " + actualSize  + " : " + usedMemory);
+      return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
     }
   }
 
@@ -130,7 +131,7 @@ public class BroadcastInputManager implements FetchedInputAllocator,
     case DISK:
       break;
     case MEMORY:
-      unreserve(fetchedInput.getSize());
+      unreserve(fetchedInput.getActualSize());
       break;
     default:
       throw new TezUncheckedException("InputType: " + fetchedInput.getType()

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 0b12a53..da74ebd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -178,10 +178,10 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
       MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
 
       return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
-          mfi.getBytes(), 0, (int) mfi.getSize());
+          mfi.getBytes(), 0, (int) mfi.getActualSize());
     } else {
       return new IFile.Reader(fetchedInput.getInputStream(),
-          fetchedInput.getSize(), codec, null, ifileReadAhead,
+          fetchedInput.getCompressedSize(), codec, null, ifileReadAhead,
           ifileReadAheadLength, ifileBufferSize);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index f9976d6..a7a12ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -101,7 +101,7 @@ public class BroadcastShuffleInputEventHandler {
           shufflePayload.getPathComponent());
       if (shufflePayload.hasData()) {
         DataProto dataProto = shufflePayload.getData();
-        FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), srcAttemptIdentifier);
+        FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
         moveDataToFetchedInput(dataProto, fetchedInput);
         shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 7246359..a4acff6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -465,7 +465,14 @@ public class BroadcastShuffleManager implements FetcherCallback {
       InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
     // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
     // For now, reporting immediately.
-    LOG.info("Fetch failed for src: " + srcAttemptIdentifier + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed);
+    LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+        + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+        + connectFailed);
+    if (srcAttemptIdentifier == null) {
+      String message = "Received fetchFailure for an unknown src (null)";
+      LOG.fatal(message);
+      inputContext.fatalError(null, message);
+    } else {
     InputReadErrorEvent readError = new InputReadErrorEvent(
         "Fetch failure while fetching from "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
@@ -478,6 +485,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
     List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
     failedEvents.add(readError);
     inputContext.sendEvents(failedEvents);
+    }
   }
   /////////////////// End of Methods from FetcherCallbackHandler
 
@@ -564,7 +572,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
   private class NullFetchedInput extends FetchedInput {
 
     public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
-      super(Type.MEMORY, -1, inputAttemptIdentifier, null);
+      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 847a0bf..5111eef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -105,7 +105,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
    * @throws IOException
    */
   public Path getOutputFileForWrite() throws IOException {
-    // TODO how to write 2 different broadcast outputs?????
     Path attemptOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
     return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 658e993..5c49da3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -101,7 +101,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
     
-    LOG.info("Closing KVOutput: RawLegnth: " + this.kvWriter.getRawLength()
+    LOG.info("Closing KVOutput: RawLength: " + this.kvWriter.getRawLength()
         + ", CompressedLength: " + this.kvWriter.getCompressedLength());
 
     if (dataViaEventsEnabled && outputGenerated && this.kvWriter.getCompressedLength() <= dataViaEventsMaxSize) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 3cdf20e..9aeb65d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -28,10 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 
 import com.google.common.base.Preconditions;
@@ -43,25 +40,17 @@ public class DiskFetchedInput extends FetchedInput {
   private final FileSystem localFS;
   private final Path tmpOutputPath;
   private final Path outputPath;
-  
-  private static final long checkSumSize; 
 
-  static {
-    DataChecksum sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
-        Integer.MAX_VALUE);
-    checkSumSize = sum.getChecksumSize();
-  }
-  
-  public DiskFetchedInput(long size,
+  public DiskFetchedInput(long actualSize, long compressedSize,
       InputAttemptIdentifier inputAttemptIdentifier,
       FetchedInputCallback callbackHandler, Configuration conf,
       LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
       throws IOException {
-    super(Type.DISK, size, inputAttemptIdentifier, callbackHandler);
+    super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
 
     this.localFS = FileSystem.getLocal(conf);
     this.outputPath = filenameAllocator.getInputFileForWrite(
-        this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
+        this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), actualSize);
     this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
   }
 
@@ -69,13 +58,6 @@ public class DiskFetchedInput extends FetchedInput {
   public OutputStream getOutputStream() throws IOException {
     return localFS.create(tmpOutputPath);
   }
-  
-  // Assumes that the file written to disk is an IFile that has a checksum 
-  // at the end. The size in super is the real data size.
-  @Override
-  public long getSize() {
-    return super.getSize() + checkSumSize;
-  }
 
   @Override
   public InputStream getInputStream() throws IOException {
@@ -123,7 +105,8 @@ public class DiskFetchedInput extends FetchedInput {
   @Override
   public String toString() {
     return "DiskFetchedInput [outputPath=" + outputPath
-        + ", inputAttemptIdentifier=" + inputAttemptIdentifier + ", size="
-        + size + ", type=" + type + ", id=" + id + ", state=" + state + "]";
+        + ", inputAttemptIdentifier=" + inputAttemptIdentifier
+        + ", actualSize=" + actualSize + ",compressedSize=" + compressedSize
+        + ", type=" + type + ", id=" + id + ", state=" + state + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
index 8f3c407..0bb765d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
@@ -42,17 +42,19 @@ public abstract class FetchedInput {
   private static AtomicInteger ID_GEN = new AtomicInteger(0);
 
   protected InputAttemptIdentifier inputAttemptIdentifier;
-  protected final long size;
+  protected final long actualSize;
+  protected final long compressedSize;
   protected final Type type;
   protected final FetchedInputCallback callback;
   protected final int id;
   protected State state;
 
-  public FetchedInput(Type type, long size,
+  public FetchedInput(Type type, long actualSize, long compressedSize,
       InputAttemptIdentifier inputAttemptIdentifier,
       FetchedInputCallback callbackHandler) {
     this.type = type;
-    this.size = size;
+    this.actualSize = actualSize;
+    this.compressedSize = compressedSize;
     this.inputAttemptIdentifier = inputAttemptIdentifier;
     this.callback = callbackHandler;
     this.id = ID_GEN.getAndIncrement();
@@ -63,8 +65,12 @@ public abstract class FetchedInput {
     return this.type;
   }
 
-  public long getSize() {
-    return this.size;
+  public long getActualSize() {
+    return this.actualSize;
+  }
+  
+  public long getCompressedSize() {
+    return this.compressedSize;
   }
 
   public InputAttemptIdentifier getInputAttemptIdentifier() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
index 1d60b68..1707ab7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
@@ -24,7 +24,7 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 public interface FetchedInputAllocator {
 
-  public FetchedInput allocate(long size,
+  public FetchedInput allocate(long actualSize, long compresedSize,
       InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
   
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 96f1caf..70059b0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -27,11 +27,10 @@ import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -92,7 +91,7 @@ public class Fetcher implements Callable<FetchResult> {
 
   // Maps from the pathComponents (unique per srcTaskId) to the specific taskId
   private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
-  private Set<InputAttemptIdentifier> remaining;
+  private LinkedHashSet<InputAttemptIdentifier> remaining;
 
   private URL url;
   private String encHash;
@@ -140,7 +139,7 @@ public class Fetcher implements Callable<FetchResult> {
       pathToAttemptMap.put(in.getPathComponent(), in);
     }
 
-    remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
+    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
 
     HttpURLConnection connection;
     try {
@@ -217,10 +216,11 @@ public class Fetcher implements Callable<FetchResult> {
       long startTime = System.currentTimeMillis();
       int responsePartition = -1;
       // Read the shuffle header
+      String pathComponent = null;
       try {
         ShuffleHeader header = new ShuffleHeader();
         header.readFields(input);
-        String pathComponent = header.getMapId();
+        pathComponent = header.getMapId();
 
         srcAttemptId = pathToAttemptMap.get(pathComponent);
         compressedLength = header.getCompressedLength();
@@ -235,7 +235,12 @@ public class Fetcher implements Callable<FetchResult> {
 
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength,
-          responsePartition, srcAttemptId)) {
+          responsePartition, srcAttemptId, pathComponent)) {
+        if (srcAttemptId == null) {
+          LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+          srcAttemptId = getNextRemainingAttempt();
+        }
+        assert(srcAttemptId != null);
         return new InputAttemptIdentifier[] { srcAttemptId };
       }
 
@@ -245,7 +250,7 @@ public class Fetcher implements Callable<FetchResult> {
       }
 
       // Get the location for the map output - either in-memory or on-disk
-      fetchedInput = inputManager.allocate(decompressedLength, srcAttemptId);
+      fetchedInput = inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
 
       // TODO NEWTEZ No concept of WAIT at the moment.
       // // Check if we can shuffle *now* ...
@@ -317,18 +322,22 @@ public class Fetcher implements Callable<FetchResult> {
    * @return true/false, based on if the verification succeeded or not
    */
   private boolean verifySanity(long compressedLength, long decompressedLength,
-      int fetchPartition, InputAttemptIdentifier srcAttemptId) {
+      int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
     if (compressedLength < 0 || decompressedLength < 0) {
       // wrongLengthErrs.increment(1);
-      LOG.warn(" invalid lengths in input header: id: " + srcAttemptId
+      LOG.warn(" invalid lengths in input header -> headerPathComponent: "
+          + pathComponent + ", nextRemainingSrcAttemptId: "
+          + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
           + " len: " + compressedLength + ", decomp len: " + decompressedLength);
       return false;
     }
 
     if (fetchPartition != this.partition) {
       // wrongReduceErrs.increment(1);
-      LOG.warn(" data for the wrong reduce map: " + srcAttemptId + " len: "
-          + compressedLength + " decomp len: " + decompressedLength
+      LOG.warn(" data for the wrong reduce -> headerPathComponent: "
+          + pathComponent + "nextRemainingSrcAttemptId: "
+          + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+          + " len: " + compressedLength + " decomp len: " + decompressedLength
           + " for reduce " + fetchPartition);
       return false;
     }
@@ -336,11 +345,21 @@ public class Fetcher implements Callable<FetchResult> {
     // Sanity check
     if (!remaining.contains(srcAttemptId)) {
       // wrongMapErrs.increment(1);
-      LOG.warn("Invalid input. Received output for " + srcAttemptId);
+      LOG.warn("Invalid input. Received output for headerPathComponent: "
+          + pathComponent + "nextRemainingSrcAttemptId: "
+          + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId);
       return false;
     }
     return true;
   }
+  
+  private InputAttemptIdentifier getNextRemainingAttempt() {
+    if (remaining.size() > 0) {
+      return remaining.iterator().next();
+    } else {
+      return null;
+    }
+  }
 
   private HttpURLConnection connectToShuffleHandler(String host, int port,
       int partition, List<InputAttemptIdentifier> inputs) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
index e34301e..cc559d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
@@ -31,11 +31,11 @@ public class MemoryFetchedInput extends FetchedInput {
 
   private BoundedByteArrayOutputStream byteStream;
 
-  public MemoryFetchedInput(long size,
+  public MemoryFetchedInput(long actualSize, long compressedSize,
       InputAttemptIdentifier inputAttemptIdentifier,
       FetchedInputCallback callbackHandler) {
-    super(Type.MEMORY, size, inputAttemptIdentifier, callbackHandler);
-    this.byteStream = new BoundedByteArrayOutputStream((int) size);
+    super(Type.MEMORY, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
+    this.byteStream = new BoundedByteArrayOutputStream((int) actualSize);
   }
 
   @Override
@@ -83,7 +83,8 @@ public class MemoryFetchedInput extends FetchedInput {
   @Override
   public String toString() {
     return "MemoryFetchedInput [inputAttemptIdentifier="
-        + inputAttemptIdentifier + ", size=" + size + ", type=" + type
-        + ", id=" + id + ", state=" + state + "]";
+        + inputAttemptIdentifier + ", actualSize=" + actualSize
+        + ", compressedSize=" + compressedSize + ", type=" + type + ", id="
+        + id + ", state=" + state + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
index e6603c4..2d663a6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
@@ -53,31 +53,32 @@ public class TestBroadcastInputManager {
     BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(), conf);
     
     long requestSize = (long) (0.4f * inMemThreshold);
+    long compressedSize = 1l;
     LOG.info("RequestSize: " + requestSize);
     
-    FetchedInput fi1 = inputManager.allocate(requestSize, new InputAttemptIdentifier(1, 1));
+    FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1, 1));
     assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
     
     
-    FetchedInput fi2 = inputManager.allocate(requestSize, new InputAttemptIdentifier(2, 1));
+    FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2, 1));
     assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
     
     
     // Over limit by this point. Next reserve should give back a DISK allocation
-    FetchedInput fi3 = inputManager.allocate(requestSize, new InputAttemptIdentifier(3, 1));
+    FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3, 1));
     assertEquals(FetchedInput.Type.DISK, fi3.getType());
     
     
     // Freed one memory allocation. Next should be mem again.
     fi1.abort();
     fi1.free();
-    FetchedInput fi4 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+    FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
     assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
     
     // Freed one disk allocation. Next sould be disk again (no mem freed)
     fi3.abort();
     fi3.free();
-    FetchedInput fi5 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+    FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
     assertEquals(FetchedInput.Type.DISK, fi5.getType());
   }