You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/03/13 03:56:29 UTC

tez git commit: TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete (jeagles)

Repository: tez
Updated Branches:
  refs/heads/master c11810440 -> a9af6cfcc


TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9af6cfc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9af6cfc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9af6cfc

Branch: refs/heads/master
Commit: a9af6cfccb8069afb01cbac3d373ffa0e2ecba93
Parents: c118104
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Sun Mar 12 22:56:02 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Sun Mar 12 22:56:02 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/util/FastNumberFormat.java   | 55 +++++++++++
 .../org/apache/tez/util/TestNumberFormat.java   | 39 ++++++++
 .../runtime/library/common/shuffle/Fetcher.java |  4 +-
 .../library/common/shuffle/ShuffleUtils.java    | 85 +++++++++++------
 .../orderedgrouped/FetcherOrderedGrouped.java   |  4 +-
 .../common/shuffle/TestShuffleUtils.java        |  6 +-
 .../orderedgrouped/TestMergeManager.java        | 98 ++++++++++----------
 8 files changed, 209 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 48ccb54..54b17b1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete
   TEZ-3655. Specify netty version instead of inheriting from hadoop dependency.
   TEZ-3253. Remove special handling for last app attempt.
   TEZ-3648. IFile.Write#close has an extra output stream flush

http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java b/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java
new file mode 100644
index 0000000..f22fc64
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.util;
+
+public class FastNumberFormat {
+
+  public static final int MAX_COUNT = 19;
+  private final char[] digits = new char[MAX_COUNT];
+  private int minimumIntegerDigits;
+
+  public static FastNumberFormat getInstance() {
+    return new FastNumberFormat();
+  }
+
+  public void setMinimumIntegerDigits(int minimumIntegerDigits) {
+    this.minimumIntegerDigits = minimumIntegerDigits;
+  }
+
+  public StringBuilder format(long source, StringBuilder sb) {
+    int left = MAX_COUNT;
+    if (source < 0) {
+      sb.append('-');
+      source = - source;
+    }
+    while (source > 0) {
+      digits[--left] = (char)('0' + (source % 10));
+      source /= 10;
+    }
+    while (MAX_COUNT - left < minimumIntegerDigits) {
+      digits[--left] = '0';
+    }
+    sb.append(digits, left, MAX_COUNT - left);
+    return sb;
+  }
+
+  public String format(long source) {
+    return format(source, new StringBuilder()).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java b/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java
new file mode 100644
index 0000000..c2f1185
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.text.NumberFormat;
+
+public class TestNumberFormat {
+
+  @Test(timeout = 1000)
+  public void testLongWithPadding() throws Exception {
+    FastNumberFormat fastNumberFormat = FastNumberFormat.getInstance();
+    fastNumberFormat.setMinimumIntegerDigits(6);
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setGroupingUsed(false);
+    numberFormat.setMinimumIntegerDigits(6);
+    long[] testLongs = {1, 23, 456, 7890, 12345, 678901, 2345689, 0, -0, -1, -23, -456, -7890, -12345, -678901, -2345689};
+    for (long l: testLongs) {
+      Assert.assertEquals("Number formats should be equal", numberFormat.format(l), fastNumberFormat.format(l));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 6cbff94..9d1f42a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -814,11 +814,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
         ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
           input, (int) decompressedLength, (int) compressedLength, codec,
           ifileReadAhead, ifileReadAheadLength, LOG,
-          fetchedInput.getInputAttemptIdentifier().toString());
+          fetchedInput.getInputAttemptIdentifier());
       } else if (fetchedInput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
           (host +":" +port), input, compressedLength, decompressedLength, LOG,
-          fetchedInput.getInputAttemptIdentifier().toString(),
+          fetchedInput.getInputAttemptIdentifier(),
           ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
       } else {
         throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +

http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 82e844d..caddbc8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -48,6 +48,7 @@ import org.apache.tez.http.SSLFactory;
 import org.apache.tez.http.async.netty.AsyncHttpConnection;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+import org.apache.tez.util.FastNumberFormat;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +89,15 @@ public class ShuffleUtils {
           return new DecimalFormat("0.00");
         }
       };
+  static final ThreadLocal<FastNumberFormat> MBPS_FAST_FORMAT =
+      new ThreadLocal<FastNumberFormat>() {
+        @Override
+        protected FastNumberFormat initialValue() {
+          FastNumberFormat fmt = FastNumberFormat.getInstance();
+          fmt.setMinimumIntegerDigits(2);
+          return fmt;
+        }
+      };
 
   public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
       throws IOException {
@@ -119,7 +129,7 @@ public class ShuffleUtils {
   public static void shuffleToMemory(byte[] shuffleData,
       InputStream input, int decompressedLength, int compressedLength,
       CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
-      Logger LOG, String identifier) throws IOException {
+      Logger LOG, InputAttemptIdentifier identifier) throws IOException {
     try {
       IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec,
           ifileReadAhead, ifileReadAheadLength);
@@ -145,7 +155,7 @@ public class ShuffleUtils {
   }
   
   public static void shuffleToDisk(OutputStream output, String hostIdentifier,
-      InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier,
+      InputStream input, long compressedLength, long decompressedLength, Logger LOG, InputAttemptIdentifier identifier,
       boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException {
     // Copy data to local-disk
     long bytesLeft = compressedLength;
@@ -530,6 +540,20 @@ public class ShuffleUtils {
       this.aggregateLogger = aggregateLogger;
     }
 
+
+    private static StringBuilder toShortString(InputAttemptIdentifier inputAttemptIdentifier, StringBuilder sb) {
+      sb.append("{");
+      sb.append(inputAttemptIdentifier.getInputIdentifier());
+      sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
+      sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
+      if (inputAttemptIdentifier.getFetchTypeInfo()
+          != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
+        sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
+        sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
+      }
+      sb.append("}");
+      return sb;
+    }
     /**
      * Log individual fetch complete event.
      * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
@@ -545,19 +569,37 @@ public class ShuffleUtils {
      */
     public void logIndividualFetchComplete(long millis, long bytesCompressed,
         long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
-      double rate = 0;
-      if (millis != 0) {
-        rate = bytesCompressed / ((double) millis / 1000);
-        rate = rate / (1024 * 1024);
-      }
+
       if (activeLogger.isInfoEnabled()) {
-        activeLogger.info(
-            "Completed fetch for attempt: "
-                + toShortString(srcAttemptIdentifier)
-                +" to " + outputType +
-                ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
-                ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-                MBPS_FORMAT.get().format(rate) + " MB/s");
+        long wholeMBs = 0;
+        long partialMBs = 0;
+        if (millis != 0) {
+          // fast math is done using integer math to avoid double to string conversion
+          // calculate B/s * 100 to preserve MBs precision to two decimal places
+          // multiply numerator by 100000 (2^5 * 5^5) and divide denominator by MB (2^20)
+          // simply fraction to protect ourselves from overflow by factoring out 2^5
+          wholeMBs = (bytesCompressed * 3125) / (millis * 32768);
+          partialMBs = wholeMBs % 100;
+          wholeMBs /= 100;
+        }
+        StringBuilder sb = new StringBuilder("Completed fetch for attempt: ");
+        toShortString(srcAttemptIdentifier, sb);
+        sb.append(" to ");
+        sb.append(outputType);
+        sb.append(", csize=");
+        sb.append(bytesCompressed);
+        sb.append(", dsize=");
+        sb.append(bytesDecompressed);
+        sb.append(", EndTime=");
+        sb.append(System.currentTimeMillis());
+        sb.append(", TimeTaken=");
+        sb.append(millis);
+        sb.append(", Rate=");
+        sb.append(wholeMBs);
+        sb.append(".");
+        MBPS_FAST_FORMAT.get().format(partialMBs, sb);
+        sb.append(" MB/s");
+        activeLogger.info(sb.toString());
       } else {
         long currentCount, currentCompressedSize, currentDecompressedSize, currentTotalTime;
         synchronized (this) {
@@ -583,21 +625,6 @@ public class ShuffleUtils {
     }
   }
 
-  private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("{");
-    sb.append(inputAttemptIdentifier.getInputIdentifier());
-    sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
-    sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
-    if (inputAttemptIdentifier.getFetchTypeInfo()
-        != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
-      sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
-      sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
-    }
-    sb.append("}");
-    return sb.toString();
-  }
-
   /**
    * Build {@link org.apache.tez.http.HttpConnectionParams} from configuration
    *

http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index bcb75d2..58ca1e2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -504,11 +504,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
       if (mapOutput.getType() == Type.MEMORY) {
         ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
           (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
-          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
+          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier());
       } else if (mapOutput.getType() == Type.DISK) {
         ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
           input, compressedLength, decompressedLength, LOG,
-          mapOutput.getAttemptIdentifier().toString(),
+          mapOutput.getAttemptIdentifier(),
           ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
       } else {
         throw new IOException("Unknown mapOutput type while fetching shuffle data:" +

http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index f21da7c..b1ce716 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -284,7 +284,7 @@ public class TestShuffleUtils {
     byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1};
     try {
       ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header),
-          1024, 128, mockCodec, false, 0, mock(Logger.class), "identifier");
+          1024, 128, mockCodec, false, 0, mock(Logger.class), null);
       Assert.fail("shuffle was supposed to throw!");
     } catch (IOException e) {
       Assert.assertTrue(e.getCause() instanceof InternalError);
@@ -301,14 +301,14 @@ public class TestShuffleUtils {
     ByteArrayInputStream in = new ByteArrayInputStream(bogusData);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     ShuffleUtils.shuffleToDisk(baos, "somehost", in,
-        bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, false);
+        bogusData.length, 2000, mock(Logger.class), null, false, 0, false);
     Assert.assertArrayEquals(bogusData, baos.toByteArray());
 
     // verify sending same stream of zeroes with validation generates an exception
     in.reset();
     try {
       ShuffleUtils.shuffleToDisk(mock(OutputStream.class), "somehost", in,
-          bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, true);
+          bogusData.length, 2000, mock(Logger.class), null, false, 0, true);
       Assert.fail("shuffle was supposed to throw!");
     } catch (IOException e) {
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 9209ff4..a812728 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -235,8 +235,8 @@ public class TestMergeManager {
     assertEquals(0, mergeManager.getUsedMemory());
     assertEquals(0, mergeManager.getCommitMemory());
 
-    byte[] data1 = generateData(conf, 10);
-    byte[] data2 = generateData(conf, 20);
+    byte[] data1 = generateData(conf, 10, null);
+    byte[] data2 = generateData(conf, 20, null);
     MapOutput firstMapOutput = mergeManager.reserve(null, data1.length, data1.length, 0);
     MapOutput secondMapOutput = mergeManager.reserve(null, data2.length, data2.length, 0);
     assertEquals(MapOutput.Type.MEMORY, firstMapOutput.getType());
@@ -294,15 +294,19 @@ public class TestMergeManager {
      * - After 3 segment commits, it would trigger mem-to-mem merge.
      * - All of them can be merged in memory.
      */
-    byte[] data1 = generateDataBySize(conf, 10);
-    byte[] data2 = generateDataBySize(conf, 20);
-    byte[] data3 = generateDataBySize(conf, 200);
-    byte[] data4 = generateDataBySize(conf, 20000);
-
-    MapOutput mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
-    MapOutput mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
-    MapOutput mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
-    MapOutput mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+    InputAttemptIdentifier inputAttemptIdentifier1 = new InputAttemptIdentifier(0,0);
+    InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1,0);
+    InputAttemptIdentifier inputAttemptIdentifier3 = new InputAttemptIdentifier(2,0);
+    InputAttemptIdentifier inputAttemptIdentifier4 = new InputAttemptIdentifier(3,0);
+    byte[] data1 = generateDataBySize(conf, 10, inputAttemptIdentifier1);
+    byte[] data2 = generateDataBySize(conf, 20, inputAttemptIdentifier2);
+    byte[] data3 = generateDataBySize(conf, 200, inputAttemptIdentifier3);
+    byte[] data4 = generateDataBySize(conf, 20000, inputAttemptIdentifier4);
+
+    MapOutput mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+    MapOutput mo2 = mergeManager.reserve(inputAttemptIdentifier1, data2.length, data2.length, 0);
+    MapOutput mo3 = mergeManager.reserve(inputAttemptIdentifier1, data3.length, data3.length, 0);
+    MapOutput mo4 = mergeManager.reserve(inputAttemptIdentifier1, data4.length, data4.length, 0);
 
     assertEquals(MapOutput.Type.MEMORY, mo1.getType());
     assertEquals(MapOutput.Type.MEMORY, mo2.getType());
@@ -351,15 +355,15 @@ public class TestMergeManager {
     mergeManager.configureAndStart();
 
     //Single shuffle limit is 25% of 2000000
-    data1 = generateDataBySize(conf, 10);
-    data2 = generateDataBySize(conf, 400000);
-    data3 = generateDataBySize(conf, 400000);
-    data4 = generateDataBySize(conf, 400000);
+    data1 = generateDataBySize(conf, 10, inputAttemptIdentifier1);
+    data2 = generateDataBySize(conf, 400000, inputAttemptIdentifier2);
+    data3 = generateDataBySize(conf, 400000, inputAttemptIdentifier3);
+    data4 = generateDataBySize(conf, 400000, inputAttemptIdentifier4);
 
-    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
-    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
-    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
-    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+    mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+    mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+    mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+    mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
 
     assertEquals(MapOutput.Type.MEMORY, mo1.getType());
     assertEquals(MapOutput.Type.MEMORY, mo2.getType());
@@ -409,15 +413,15 @@ public class TestMergeManager {
     mergeManager.configureAndStart();
 
     //Single shuffle limit is 25% of 2000000
-    data1 = generateDataBySize(conf, 400000);
-    data2 = generateDataBySize(conf, 400000);
-    data3 = generateDataBySize(conf, 400000);
-    data4 = generateDataBySize(conf, 400000);
+    data1 = generateDataBySize(conf, 400000, inputAttemptIdentifier1);
+    data2 = generateDataBySize(conf, 400000, inputAttemptIdentifier2);
+    data3 = generateDataBySize(conf, 400000, inputAttemptIdentifier3);
+    data4 = generateDataBySize(conf, 400000, inputAttemptIdentifier4);
 
-    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
-    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
-    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
-    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+    mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+    mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+    mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+    mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
 
     assertEquals(MapOutput.Type.MEMORY, mo1.getType());
     assertEquals(MapOutput.Type.MEMORY, mo2.getType());
@@ -465,15 +469,15 @@ public class TestMergeManager {
     mergeManager.configureAndStart();
 
     //Single shuffle limit is 25% of 2000000
-    data1 = generateDataBySize(conf, 490000);
-    data2 = generateDataBySize(conf, 490000);
-    data3 = generateDataBySize(conf, 490000);
-    data4 = generateDataBySize(conf, 230000);
+    data1 = generateDataBySize(conf, 490000, inputAttemptIdentifier1);
+    data2 = generateDataBySize(conf, 490000, inputAttemptIdentifier2);
+    data3 = generateDataBySize(conf, 490000, inputAttemptIdentifier3);
+    data4 = generateDataBySize(conf, 230000, inputAttemptIdentifier4);
 
-    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
-    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
-    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
-    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+    mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+    mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+    mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+    mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
 
     assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000));
 
@@ -520,15 +524,15 @@ public class TestMergeManager {
     mergeManager.configureAndStart();
 
     //Single shuffle limit is 25% of 2000000
-    data1 = generateDataBySize(conf, 490000);
-    data2 = generateDataBySize(conf, 490000);
-    data3 = generateDataBySize(conf, 490000);
-    data4 = generateDataBySize(conf, 230000);
+    data1 = generateDataBySize(conf, 490000, inputAttemptIdentifier1);
+    data2 = generateDataBySize(conf, 490000, inputAttemptIdentifier2);
+    data3 = generateDataBySize(conf, 490000, inputAttemptIdentifier3);
+    data4 = generateDataBySize(conf, 230000, inputAttemptIdentifier4);
 
-    mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
-    mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
-    mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
-    mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+    mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+    mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+    mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+    mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
 
     assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000));
 
@@ -566,7 +570,7 @@ public class TestMergeManager {
     Assert.assertFalse(mergeManager.isMergeComplete());
   }
 
-  private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {
+  private byte[] generateDataBySize(Configuration conf, int rawLen, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
     IFile.Writer writer =
@@ -584,11 +588,11 @@ public class TestMergeManager {
     int rawLength = (int)writer.getRawLength();
     byte[] data = new byte[rawLength];
     ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()),
-        rawLength, compressedLength, null, false, 0, LOG, "sometask");
+        rawLength, compressedLength, null, false, 0, LOG, inputAttemptIdentifier);
     return data;
   }
 
-  private byte[] generateData(Configuration conf, int numEntries) throws IOException {
+  private byte[] generateData(Configuration conf, int numEntries, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
     IFile.Writer writer =
@@ -601,7 +605,7 @@ public class TestMergeManager {
     int rawLength = (int)writer.getRawLength();
     byte[] data = new byte[rawLength];
     ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()),
-        rawLength, compressedLength, null, false, 0, LOG, "sometask");
+        rawLength, compressedLength, null, false, 0, LOG, inputAttemptIdentifier);
     return data;
   }