You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:26 UTC

[17/50] [abbrv] tez git commit: TEZ-1731. OnDiskMerger can end up clobbering files across tasks with LocalDiskFetch. (sseth)

TEZ-1731. OnDiskMerger can end up clobbering files across tasks with
LocalDiskFetch. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57827160
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57827160
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57827160

Branch: refs/heads/TEZ-8
Commit: 57827160a8772858f667aacba65c6589234c641b
Parents: 8433c61
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Nov 3 13:39:49 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Nov 3 13:39:49 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/hadoop/io/FileChunk.java    |  30 ++-
 .../orderedgrouped/FetcherOrderedGrouped.java   |   4 +-
 .../shuffle/orderedgrouped/MapOutput.java       |   7 +-
 .../shuffle/orderedgrouped/MergeManager.java    |  45 ++--
 .../local/output/TezLocalTaskOutputFiles.java   |  10 +-
 .../common/task/local/output/TezTaskOutput.java |   7 +
 .../task/local/output/TezTaskOutputFiles.java   |   7 +-
 .../orderedgrouped/TestMergeManager.java        | 237 +++++++++++++++++++
 9 files changed, 318 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce5a0d2..f741109 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -97,6 +97,7 @@ ALL CHANGES:
   TEZ-1725. Fix nanosecond to millis conversion in TezMxBeanResourceCalculator.
   TEZ-1726. Build broken against Hadoop-2.6.0 due to change in NodeReport.
   TEZ-1579. MR examples should be setting mapreduce.framework.name to yarn-tez.
+  TEZ-1731. OnDiskMerger can end up clobbering files across tasks with LocalDiskFetch enabled.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
index a7eb90a..0ba39e2 100644
--- a/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/FileChunk.java
@@ -18,22 +18,34 @@
 
 package org.apache.hadoop.io;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 @Private
 public class FileChunk implements Comparable<FileChunk> {
 
   private final long offset;
   private final long length;
-  private final boolean preserveAfterUse;
+  private final boolean isLocalFile;
   private final Path path;
+  private final InputAttemptIdentifier identifier;
 
-  public FileChunk(Path path, long offset, long length, boolean preserveAfterUse) {
+  public FileChunk(Path path, long offset, long length, boolean isLocalFile,
+                   InputAttemptIdentifier identifier) {
     this.path = path;
     this.offset = offset;
     this.length = length;
-    this.preserveAfterUse = preserveAfterUse;
+    this.isLocalFile = isLocalFile;
+    this.identifier = identifier;
+    if (isLocalFile) {
+      Preconditions.checkNotNull(identifier);
+    }
+  }
+
+  public FileChunk(Path path, long offset, long length) {
+    this(path, offset, length, false, null);
   }
 
   @Override
@@ -87,11 +99,15 @@ public class FileChunk implements Comparable<FileChunk> {
     return length;
   }
 
-  public boolean preserveAfterUse() {
-    return preserveAfterUse;
-  }
-
   public Path getPath() {
     return path;
   }
+
+  public boolean isLocalFile() {
+    return this.isLocalFile;
+  }
+
+  public InputAttemptIdentifier getInputAttemptIdentifier() {
+    return this.identifier;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 2b5a863..e83c705 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -441,7 +441,7 @@ class FetcherOrderedGrouped extends Thread {
         LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
             ", decomp len: " + decompressedLength);
       }
-      
+
       // Get the location for the map output - either in-memory or on-disk
       try {
         mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
@@ -467,7 +467,7 @@ class FetcherOrderedGrouped extends Thread {
       // Go!
       LOG.info("fetcher#" + id + " about to shuffle output of map " + 
                mapOutput.getAttemptIdentifier() + " decomp: " +
-               decompressedLength + " len: " + compressedLength);
+               decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
       if (mapOutput.getType() == Type.MEMORY) {
         ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
           (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index c735a43..231975b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -89,8 +89,11 @@ class MapOutput {
     this.disk = null;
 
     if (type == Type.DISK || type == Type.DISK_DIRECT) {
-      boolean preserve = (type == Type.DISK_DIRECT); // type disk are temp files.
-      this.outputPath = new FileChunk(outputPath, offset, size, preserve);
+      if (type == Type.DISK_DIRECT) {
+        this.outputPath = new FileChunk(outputPath, offset, size, true, attemptIdentifier);
+      } else {
+        this.outputPath = new FileChunk(outputPath, offset, size, false, attemptIdentifier);
+      }
     } else {
       this.outputPath = null;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 0db5237..6081f91 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -88,9 +89,11 @@ public class MergeManager {
   private final Set<MapOutput> inMemoryMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
   private final InMemoryMerger inMemoryMerger;
-  
-  private final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
-  private final OnDiskMerger onDiskMerger;
+
+  @VisibleForTesting
+  final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
+  @VisibleForTesting
+  final OnDiskMerger onDiskMerger;
   
   private final long memoryLimit;
   private final int postMergeMemLimit;
@@ -131,7 +134,7 @@ public class MergeManager {
   /**
    * Construct the MergeManager. Must call start before it becomes usable.
    */
-  public MergeManager(Configuration conf, 
+  public MergeManager(Configuration conf,
                       FileSystem localFS,
                       LocalDirAllocator localDirAllocator,  
                       InputContext inputContext,
@@ -213,7 +216,7 @@ public class MergeManager {
     }
     
     LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
-        + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
+        + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem="
         + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
 
     this.ioSortFactor = 
@@ -253,7 +256,7 @@ public class MergeManager {
       throw new RuntimeException("Invlaid configuration: "
           + "maxSingleShuffleLimit should be less than mergeThreshold"
           + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
-          + "mergeThreshold: " + this.mergeThreshold);
+          + ", mergeThreshold: " + this.mergeThreshold);
     }
     
     boolean allowMemToMemMerge = 
@@ -494,6 +497,7 @@ public class MergeManager {
         return;
       }
 
+
       InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
@@ -629,7 +633,7 @@ public class MergeManager {
       }
 
       // Note the output of the merge
-      closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen, false));
+      closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen));
     }
 
   }
@@ -637,7 +641,8 @@ public class MergeManager {
   /**
    * Merges multiple on-disk segments
    */
-  private class OnDiskMerger extends MergeThread<FileChunk> {
+  @VisibleForTesting
+  class OnDiskMerger extends MergeThread<FileChunk> {
 
     public OnDiskMerger(MergeManager manager) {
       super(manager, ioSortFactor, exceptionReporter);
@@ -668,7 +673,7 @@ public class MergeManager {
       for (FileChunk fileChunk : inputs) {
         final long offset = fileChunk.getOffset();
         final long size = fileChunk.getLength();
-        final boolean preserve = fileChunk.preserveAfterUse();
+        final boolean preserve = fileChunk.isLocalFile();
         final Path file = fileChunk.getPath();
         approxOutputSize += size;
         Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
@@ -681,9 +686,19 @@ public class MergeManager {
         ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
 
       // 2. Start the on-disk merge process
-      Path outputPath = 
-        localDirAllocator.getLocalPathForWrite(inputs.get(0).getPath().toString(),
-            approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
+      FileChunk file0 = inputs.get(0);
+      String namePart;
+      if (file0.isLocalFile()) {
+        // This is setup the same way a type DISK MapOutput is setup when fetching.
+        namePart = mapOutputFile.getSpillFileName(
+            file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex());
+
+      } else {
+        namePart = file0.getPath().getName().toString();
+      }
+      Path outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
+      outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX);
+
       Writer writer = 
         new Writer(conf, rfs, outputPath, 
                         (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
@@ -712,7 +727,7 @@ public class MergeManager {
       }
 
       final long outputLen = localFS.getFileStatus(outputPath).getLen();
-      closeOnDiskFile(new FileChunk(outputPath, 0, outputLen, false));
+      closeOnDiskFile(new FileChunk(outputPath, 0, outputLen));
 
       LOG.info(inputContext.getUniqueIdentifier() +
           " Finished merging " + inputs.size() + 
@@ -857,7 +872,7 @@ public class MergeManager {
 
         final FileStatus fStatus = localFS.getFileStatus(outputPath);
         // add to list of final disk outputs.
-        onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen(), false));
+        onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen()));
 
         LOG.info("Merged " + numMemDiskSegments + " segments, " +
                  inMemToDiskBytes + " bytes to disk to satisfy " +
@@ -885,7 +900,7 @@ public class MergeManager {
           file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;
 
       final long fileOffset = fileChunk.getOffset();
-      final boolean preserve = fileChunk.preserveAfterUse();
+      final boolean preserve = fileChunk.isLocalFile();
       diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
                                    ifileReadAheadLength, ifileBufferSize, preserve, counter));
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
index 3d83010..e02011f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -222,11 +222,17 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   public Path getInputFileForWrite(int taskId,
                                    long size)
       throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, taskId),
+    return lDirAlloc.getLocalPathForWrite(getSpillFileName(taskId),
         size, conf);
   }
 
+  @Override
+  public String getSpillFileName(int spillNum) {
+    return (String.format(
+        Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR,
+        spillNum));
+  }
+
   /** Removes all of the files related to a task. */
   @Override
   public void removeAll()

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index d3e7d27..e34e399 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -160,6 +160,13 @@ public abstract class TezTaskOutput {
   public abstract Path getInputFileForWrite(
       int taskIdentifier, long size) throws IOException;
 
+  /**
+   * Construct a spill file name, given a spill number
+   * @param spillNum
+   * @return
+   */
+  public abstract String getSpillFileName(int spillNum);
+
   /** Removes all of the files related to a task. */
   public abstract void removeAll() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 8ae1a04..86a83ac 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -234,11 +234,14 @@ public class TezTaskOutputFiles extends TezTaskOutput {
    */
   public Path getInputFileForWrite(int srcTaskId,
       long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(SPILL_FILE_PATTERN,
-        uniqueId, srcTaskId),
+    return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcTaskId),
         size, conf);
   }
 
+  public String getSpillFileName(int spillNum) {
+    return String.format(SPILL_FILE_PATTERN, uniqueId, spillNum);
+  }
+
   /** Removes all of the files related to a task. */
   public void removeAll() throws IOException {
     throw new UnsupportedOperationException("Incompatible with LocalRunner");

http://git-wip-us.apache.org/repos/asf/tez/blob/57827160/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
new file mode 100644
index 0000000..7615ba7
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FileChunk;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMergeManager {
+
+
+  private static final Log LOG = LogFactory.getLog(TestMergeManager.class);
+
+  private static Configuration defaultConf = new TezConfiguration();
+  private static FileSystem localFs = null;
+  private static Path workDir = null;
+
+  static {
+    try {
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir =
+          new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+              TestMergeManager.class.getSimpleName());
+      workDir = localFs.makeQualified(workDir);
+      localFs.mkdirs(workDir);
+      LOG.info("Using workDir: " + workDir);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  @Before
+  @After
+  public void cleanup() throws IOException {
+    localFs.delete(workDir, true);
+  }
+
+  @Test(timeout = 10000)
+  public void testLocalDiskMergeMultipleTasks() throws IOException {
+
+    Configuration conf = new TezConfiguration(defaultConf);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+
+    Path localDir = new Path(workDir, "local");
+    Path srcDir = new Path(workDir, "srcData");
+    localFs.mkdirs(localDir);
+    localFs.mkdirs(srcDir);
+
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(conf);
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    InputContext t0inputContext = createMockInputContext(UUID.randomUUID().toString());
+    InputContext t1inputContext = createMockInputContext(UUID.randomUUID().toString());
+
+    ExceptionReporter t0exceptionReporter = mock(ExceptionReporter.class);
+    ExceptionReporter t1exceptionReporter = mock(ExceptionReporter.class);
+
+    MergeManager t0mergeManagerReal =
+        new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null, null,
+            t0exceptionReporter, 2000000, null, false, -1);
+    MergeManager t0mergeManager = spy(t0mergeManagerReal);
+
+    MergeManager t1mergeManagerReal =
+        new MergeManager(conf, localFs, localDirAllocator, t1inputContext, null, null, null, null,
+            t1exceptionReporter, 2000000, null, false, -1);
+    MergeManager t1mergeManager = spy(t1mergeManagerReal);
+
+    // Partition 0 Keys 0-2, Partition 1 Keys 3-5
+    SrcFileInfo src1Info =
+        createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src1.out"),
+            2, 3, 0);
+    // Partition 0 Keys 6-8, Partition 1 Keys 9-11
+    SrcFileInfo src2Info =
+        createFile(conf, localFs, new Path(srcDir, InputAttemptIdentifier.PATH_PREFIX + "src2.out"),
+            2, 3, 6);
+
+
+    // Simulating Task 0 fetches partition 0. (targetIndex = 0,1)
+
+    // Simulating Task 1 fetches partition 1. (targetIndex = 0,1)
+
+    InputAttemptIdentifier t0Identifier0 =
+        new InputAttemptIdentifier(0, 0, src1Info.path.getName());
+    InputAttemptIdentifier t0Identifier1 =
+        new InputAttemptIdentifier(1, 0, src2Info.path.getName());
+
+    InputAttemptIdentifier t1Identifier0 =
+        new InputAttemptIdentifier(0, 0, src1Info.path.getName());
+    InputAttemptIdentifier t1Identifier1 =
+        new InputAttemptIdentifier(1, 0, src2Info.path.getName());
+
+
+    MapOutput t0MapOutput0 =
+        getMapOutputForDirectDiskFetch(t0Identifier0, src1Info.path, src1Info.indexedRecords[0],
+            t0mergeManager);
+    MapOutput t0MapOutput1 =
+        getMapOutputForDirectDiskFetch(t0Identifier1, src2Info.path, src2Info.indexedRecords[0],
+            t0mergeManager);
+
+    MapOutput t1MapOutput0 =
+        getMapOutputForDirectDiskFetch(t1Identifier0, src1Info.path, src1Info.indexedRecords[1],
+            t1mergeManager);
+    MapOutput t1MapOutput1 =
+        getMapOutputForDirectDiskFetch(t1Identifier1, src2Info.path, src2Info.indexedRecords[1],
+            t1mergeManager);
+
+
+    t0MapOutput0.commit();
+    t0MapOutput1.commit();
+    verify(t0mergeManager).closeOnDiskFile(t0MapOutput0.getOutputPath());
+    verify(t0mergeManager).closeOnDiskFile(t0MapOutput1.getOutputPath());
+    // Run the OnDiskMerge via MergeManager
+    // Simulate the thread invocation - remove files, and invoke merge
+    List<FileChunk> t0MergeFiles = new LinkedList<FileChunk>();
+    t0MergeFiles.addAll(t0mergeManager.onDiskMapOutputs);
+    t0mergeManager.onDiskMapOutputs.clear();
+    t0mergeManager.onDiskMerger.merge(t0MergeFiles);
+    Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size());
+
+
+    t1MapOutput0.commit();
+    t1MapOutput1.commit();
+    verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath());
+    verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath());
+    // Run the OnDiskMerge via MergeManager
+    // Simulate the thread invocation - remove files, and invoke merge
+    List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>();
+    t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs);
+    t1mergeManager.onDiskMapOutputs.clear();
+    t1mergeManager.onDiskMerger.merge(t1MergeFiles);
+    Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size());
+
+    Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(),
+        t1mergeManager.onDiskMapOutputs.iterator().next().getPath());
+
+    Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+        .contains(t0inputContext.getUniqueIdentifier()));
+    Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+        .contains(t1inputContext.getUniqueIdentifier()));
+
+  }
+
+  private InputContext createMockInputContext(String uniqueId) {
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+    doReturn(200 * 1024 * 1024l).when(inputContext).getTotalMemoryAvailableToTask();
+    doReturn("srcVertexName").when(inputContext).getSourceVertexName();
+    doReturn(uniqueId).when(inputContext).getUniqueIdentifier();
+    return inputContext;
+  }
+
+  private SrcFileInfo createFile(Configuration conf, FileSystem fs, Path path, int numPartitions,
+                                 int numKeysPerPartition, int startKey) throws IOException {
+    FSDataOutputStream outStream = fs.create(path);
+    int currentKey = startKey;
+    SrcFileInfo srcFileInfo = new SrcFileInfo();
+    srcFileInfo.indexedRecords = new TezIndexRecord[numPartitions];
+    srcFileInfo.path = path;
+    for (int i = 0; i < numPartitions; i++) {
+      long pos = outStream.getPos();
+      IFile.Writer writer =
+          new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
+      for (int j = 0; j < numKeysPerPartition; j++) {
+        writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
+        currentKey++;
+      }
+      writer.close();
+      srcFileInfo.indexedRecords[i] =
+          new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
+    }
+    outStream.close();
+    return srcFileInfo;
+  }
+
+  private class SrcFileInfo {
+    private Path path;
+    private TezIndexRecord[] indexedRecords;
+  }
+
+  // Copied from FetcherOrderedGrouped
+  private static MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId,
+                                                          Path filename, TezIndexRecord indexRecord,
+                                                          MergeManager merger)
+      throws IOException {
+    return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
+        indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
+  }
+}