You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:01 UTC

[07/24] tez git commit: TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs (jeagles)

TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3ff360aa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3ff360aa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3ff360aa

Branch: refs/heads/TEZ-2980
Commit: 3ff360aa18373f2b4aa03648de905c690ce5a180
Parents: 7e636a5
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Jan 29 13:43:59 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Jan 29 13:43:59 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |   1 -
 .../library/common/InputAttemptIdentifier.java  |  28 ++---
 .../common/shuffle/DiskFetchedInput.java        |   2 +-
 .../library/common/shuffle/ShuffleUtils.java    |   2 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |   3 +-
 .../common/shuffle/impl/ShuffleManager.java     |  23 ++---
 .../FetchedInputAllocatorOrderedGrouped.java    |   3 +
 .../shuffle/orderedgrouped/InMemoryReader.java  | 102 ++++++++++++++++++-
 .../shuffle/orderedgrouped/MapOutput.java       |  32 +++---
 .../shuffle/orderedgrouped/MergeManager.java    |  11 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java |   3 +-
 .../orderedgrouped/ShuffleScheduler.java        |  29 +++---
 .../runtime/library/common/sort/impl/IFile.java |   6 +-
 .../library/common/shuffle/TestFetcher.java     |  27 +++--
 .../impl/TestShuffleInputEventHandlerImpl.java  |  11 +-
 .../shuffle/orderedgrouped/TestFetcher.java     |  29 +++---
 ...tShuffleInputEventHandlerOrderedGrouped.java |  17 ++--
 .../orderedgrouped/TestShuffleScheduler.java    |  65 ++++++------
 19 files changed, 238 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d69390c..6570f8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
   TEZ-3079. Fix tez-tfile parser documentation.
   TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services.
   TEZ-3036. Tez AM can hang on startup with no indication of error
@@ -322,6 +323,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs
   TEZ-3066. TaskAttemptFinishedEvent ConcurrentModificationException in recovery or history logging services.
   TEZ-3036. Tez AM can hang on startup with no indication of error
   TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 3a602bc..6be682d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -132,7 +132,6 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index d70942c..cc9c6ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -27,7 +27,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 @Private
 public class InputAttemptIdentifier {
 
-  private final InputIdentifier inputIdentifier;
+  private final int inputIdentifier;
   private final int attemptNumber;
   private final String pathComponent;
   private final boolean shared;
@@ -49,18 +49,18 @@ public class InputAttemptIdentifier {
   private final int spillEventId;
 
   public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
-    this(new InputIdentifier(inputIndex), attemptNumber, null);
+    this(inputIndex, attemptNumber, null);
   }
 
-  public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+  public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent) {
     this(inputIdentifier, attemptNumber, pathComponent, false);
   }
 
-  public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, boolean shared) {
+  public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean shared) {
     this(inputIdentifier, attemptNumber, pathComponent, shared, SPILL_INFO.FINAL_MERGE_ENABLED, -1);
   }
 
-  public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent,
+  public InputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent,
       boolean shared, SPILL_INFO fetchTypeInfo, int spillEventId) {
     this.inputIdentifier = inputIdentifier;
     this.attemptNumber = attemptNumber;
@@ -74,15 +74,7 @@ public class InputAttemptIdentifier {
     }
   }
 
-  public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
-    this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
-  }
-
-  public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent, boolean shared) {
-    this(new InputIdentifier(taskIndex), attemptNumber, pathComponent, shared);
-  }
-
-  public InputIdentifier getInputIdentifier() {
+  public int getInputIdentifier() {
     return this.inputIdentifier;
   }
 
@@ -117,8 +109,7 @@ public class InputAttemptIdentifier {
     final int prime = 31;
     int result = 1;
     result = prime * result + attemptNumber;
-    result = prime * result
-        + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
+    result = prime * result + inputIdentifier;
     return result;
   }
 
@@ -133,10 +124,7 @@ public class InputAttemptIdentifier {
     InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
     if (attemptNumber != other.attemptNumber)
       return false;
-    if (inputIdentifier == null) {
-      if (other.inputIdentifier != null)
-        return false;
-    } else if (!inputIdentifier.equals(other.inputIdentifier))
+    if (inputIdentifier != other.inputIdentifier)
       return false;
     // do not compare pathComponent as they may not always be present
     return true;

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
index dfad39d..c873af7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
@@ -50,7 +50,7 @@ public class DiskFetchedInput extends FetchedInput {
 
     this.localFS = FileSystem.getLocal(conf).getRaw();
     this.outputPath = filenameAllocator.getInputFileForWrite(
-        this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), this
+        this.inputAttemptIdentifier.getInputIdentifier(), this
             .inputAttemptIdentifier.getSpillEventId(), actualSize);
     // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
     // otherwise fetches for the same task but from different attempts would clobber each other.

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 431ba38..e8bf6ae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -498,7 +498,7 @@ public class ShuffleUtils {
   private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {
     StringBuilder sb = new StringBuilder();
     sb.append("{");
-    sb.append(inputAttemptIdentifier.getInputIdentifier().getInputIndex());
+    sb.append(inputAttemptIdentifier.getInputIdentifier());
     sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
     sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
     if (inputAttemptIdentifier.getFetchTypeInfo()

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 8fb1568..adc3432 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -176,7 +175,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO
           .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
       srcAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent
+          new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent
               .getVersion(), pathComponent, isShared, spillInfo, spillEventId);
     } else {
       srcAttemptIdentifier =

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b3e050a..7f2054b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -67,7 +67,6 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.FetchResult;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
@@ -111,7 +110,7 @@ public class ShuffleManager implements FetcherCallback {
   
   private final BlockingQueue<FetchedInput> completedInputs;
   private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
-  private final Set<InputIdentifier> completedInputSet;
+  private final Set<Integer> completedInputSet;
   private final ConcurrentMap<String, InputHost> knownSrcHosts;
   private final BlockingQueue<InputHost> pendingHosts;
   private final Set<InputAttemptIdentifier> obsoletedInputs;
@@ -171,7 +170,7 @@ public class ShuffleManager implements FetcherCallback {
 
   //To track shuffleInfo events when finalMerge is disabled OR pipelined shuffle is enabled in source.
   @VisibleForTesting
-  final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap;
+  final Map<Integer, ShuffleEventInfo> shuffleInfoEventsMap;
 
   // TODO More counters - FetchErrors, speed?
   
@@ -205,7 +204,7 @@ public class ShuffleManager implements FetcherCallback {
     
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
   
-    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>(numInputs));
     /**
      * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt.
      * We do not know upfront the number of spills from source.
@@ -266,7 +265,7 @@ public class ShuffleManager implements FetcherCallback {
 
     Arrays.sort(this.localDisks);
 
-    shuffleInfoEventsMap = new ConcurrentHashMap<InputIdentifier, ShuffleEventInfo>();
+    shuffleInfoEventsMap = new ConcurrentHashMap<Integer, ShuffleEventInfo>();
 
     LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec="
         + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
@@ -479,7 +478,7 @@ public class ShuffleManager implements FetcherCallback {
       return;
     }
 
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
     if (shuffleInfoEventsMap.get(inputIdentifier) == null) {
       shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier));
     }
@@ -501,7 +500,7 @@ public class ShuffleManager implements FetcherCallback {
 
   public void addCompletedInputWithNoData(
       InputAttemptIdentifier srcAttemptIdentifier) {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
     if (LOG.isDebugEnabled()) {
       LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
     }
@@ -558,7 +557,7 @@ public class ShuffleManager implements FetcherCallback {
 
 
     ShuffleEventInfo(InputAttemptIdentifier input) {
-      this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
+      this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber();
       this.eventsProcessed = new BitSet();
       this.attemptNum = input.getAttemptNumber();
     }
@@ -594,7 +593,7 @@ public class ShuffleManager implements FetcherCallback {
   public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
       FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
       throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
 
     // Count irrespective of whether this is a copy of an already fetched input
     lock.lock();
@@ -706,7 +705,7 @@ public class ShuffleManager implements FetcherCallback {
       return;
     }
 
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
     ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier);
 
     //for empty partition case
@@ -769,9 +768,9 @@ public class ShuffleManager implements FetcherCallback {
         "Fetch failure while fetching from "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
             inputContext.getSourceVertexName(),
-            srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+            srcAttemptIdentifier.getInputIdentifier(),
             srcAttemptIdentifier.getAttemptNumber()),
-        srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+        srcAttemptIdentifier.getInputIdentifier(),
         srcAttemptIdentifier.getAttemptNumber());
 
     List<Event> failedEvents = Lists.newArrayListWithCapacity(1);

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
index ec1f8eb..7276f74 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetchedInputAllocatorOrderedGrouped.java
@@ -16,6 +16,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.FileChunk;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
@@ -29,6 +30,8 @@ public interface FetchedInputAllocatorOrderedGrouped {
 
   void closeInMemoryFile(MapOutput mapOutput);
 
+  FileSystem getLocalFileSystem();
+
   void closeOnDiskFile(FileChunk file);
 
   void unreserve(long bytes);

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 75c552e..7860377 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -37,9 +38,103 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
 @InterfaceStability.Unstable
 public class InMemoryReader extends Reader {
 
+  private static class ByteArrayDataInput extends ByteArrayInputStream implements DataInput {
+
+    public ByteArrayDataInput(byte buf[], int offset, int length) {
+      super(buf, offset, length);
+    }
+
+    public void reset(byte[] input, int start, int length) {
+      this.buf = input;
+      this.count = start+length;
+      this.mark = start;
+      this.pos = start;
+    }
+
+    public byte[] getData() { return buf; }
+    public int getPosition() { return pos; }
+    public int getLength() { return count; }
+    public int getMark() { return mark; }
+
+    @Override
+    public void readFully(byte[] b) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void readFully(byte[] b, int off, int len) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int skipBytes(int n) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+      return (byte)read();
+    }
+
+    @Override
+    public int readUnsignedByte() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public short readShort() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int readUnsignedShort() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public char readChar() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int readInt() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long readLong() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public float readFloat() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double readDouble() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String readLine() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String readUTF() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
   private final InputAttemptIdentifier taskAttemptId;
   private final MergeManager merger;
-  DataInputBuffer memDataIn = new DataInputBuffer();
+  ByteArrayDataInput memDataIn;
   private int start;
   private int length;
   private int originalKeyPos;
@@ -49,12 +144,12 @@ public class InMemoryReader extends Reader {
       int length)
       throws IOException {
     super(null, length - start, null, null, null, false, 0, -1);
-    this.merger = merger;
     this.taskAttemptId = taskAttemptId;
+    this.merger = merger;
 
     buffer = data;
     bufferSize = (int) length;
-    memDataIn.reset(buffer, start, length);
+    memDataIn = new ByteArrayDataInput(buffer, start, length);
     this.start = start;
     this.length = length;
   }
@@ -160,7 +255,6 @@ public class InMemoryReader extends Reader {
 
   public void close() {
     // Release
-    dataIn = null;
     buffer = null;
     // Inform the MergeManager
     if (merger != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index f19cd55..7e3d983 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -47,17 +47,14 @@ class MapOutput {
   private final int id;
   private final Type type;
   private InputAttemptIdentifier attemptIdentifier;
-  private final long size;
 
   private final boolean primaryMapOutput;
   private final FetchedInputAllocatorOrderedGrouped callback;
 
   // MEMORY
-  private final byte[] memory;
   private BoundedByteArrayOutputStream byteStream;
 
   // DISK
-  private final FileSystem localFS;
   private final Path tmpOutputPath;
   private final FileChunk outputPath;
   private OutputStream disk;
@@ -71,18 +68,13 @@ class MapOutput {
     this.callback = callback;
     this.primaryMapOutput = primaryMapOutput;
 
-    this.localFS = fs;
-    this.size = size;
-
     // Other type specific values
 
     if (type == Type.MEMORY) {
       // since we are passing an int from createMemoryMapOutput, its safe to cast to int
       this.byteStream = new BoundedByteArrayOutputStream((int)size);
-      this.memory = byteStream.getBuffer();
     } else {
       this.byteStream = null;
-      this.memory = null;
     }
 
     this.tmpOutputPath = tmpOutputPath;
@@ -97,7 +89,6 @@ class MapOutput {
     } else {
       this.outputPath = null;
     }
-
   }
 
   public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
@@ -107,7 +98,7 @@ class MapOutput {
       IOException {
     FileSystem fs = FileSystem.getLocal(conf).getRaw();
     Path outputpath = mapOutputFile.getInputFileForWrite(
-        attemptIdentifier.getInputIdentifier().getInputIndex(), attemptIdentifier.getSpillEventId(), size);
+        attemptIdentifier.getInputIdentifier(), attemptIdentifier.getSpillEventId(), size);
     // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
     // otherwise fetches for the same task but from different attempts would clobber each other.
     Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
@@ -115,7 +106,7 @@ class MapOutput {
 
     MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, callback, size, outputpath, offset,
         primaryMapOutput, fs, tmpOuputPath);
-    mapOutput.disk = mapOutput.localFS.create(tmpOuputPath);
+    mapOutput.disk = fs.create(tmpOuputPath);
 
     return mapOutput;
   }
@@ -160,7 +151,7 @@ class MapOutput {
   }
 
   public byte[] getMemory() {
-    return memory;
+    return byteStream.getBuffer();
   }
 
   public BoundedByteArrayOutputStream getArrayStream() {
@@ -180,14 +171,19 @@ class MapOutput {
   }
 
   public long getSize() {
-    return size;
+    if (type == Type.MEMORY) {
+      return byteStream.getLimit();
+    } else if (type == Type.DISK || type == Type.DISK_DIRECT) {
+      return outputPath.getLength();
+    }
+    return -1;
   }
 
   public void commit() throws IOException {
     if (type == Type.MEMORY) {
       callback.closeInMemoryFile(this);
     } else if (type == Type.DISK) {
-      localFS.rename(tmpOutputPath, outputPath.getPath());
+      callback.getLocalFileSystem().rename(tmpOutputPath, outputPath.getPath());
       callback.closeOnDiskFile(outputPath);
     } else if (type == Type.DISK_DIRECT) {
       callback.closeOnDiskFile(outputPath);
@@ -198,10 +194,10 @@ class MapOutput {
   
   public void abort() {
     if (type == Type.MEMORY) {
-      callback.unreserve(memory.length);
+      callback.unreserve(byteStream.getBuffer().length);
     } else if (type == Type.DISK) {
       try {
-        localFS.delete(tmpOutputPath, false);
+        callback.getLocalFileSystem().delete(tmpOutputPath, true);
       } catch (IOException ie) {
         LOG.info("failure to clean up " + tmpOutputPath, ie);
       }
@@ -223,9 +219,9 @@ class MapOutput {
         return 0;
       }
       
-      if (o1.size < o2.size) {
+      if (o1.getSize() < o2.getSize()) {
         return -1;
-      } else if (o1.size > o2.size) {
+      } else if (o1.getSize() > o2.getSize()) {
         return 1;
       }
       

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 61ff338..dfa509f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -503,6 +503,11 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
   }
 
   @Override
+  public FileSystem getLocalFileSystem() {
+    return localFS;
+  }
+
+  @Override
   public synchronized void closeOnDiskFile(FileChunk file) {
     //including only path & offset for valdiations.
     for (FileChunk fileChunk : onDiskMapOutputs) {
@@ -726,7 +731,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       // All disk writes done by this merge are overhead - due to the lack of
       // adequate memory to keep all segments in memory.
       outputPath = mapOutputFile.getInputFileForWrite(
-          srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
+          srcTaskIdentifier.getInputIdentifier(), srcTaskIdentifier.getSpillEventId(),
           mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
 
       Writer writer = null;
@@ -863,7 +868,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       if (file0.isLocalFile()) {
         // This is setup the same way a type DISK MapOutput is setup when fetching.
         namePart = mapOutputFile.getSpillFileName(
-            file0.getInputAttemptIdentifier().getInputIdentifier().getInputIndex(),
+            file0.getInputAttemptIdentifier().getInputIdentifier(),
             file0.getInputAttemptIdentifier().getSpillEventId());
       } else {
         namePart = file0.getPath().getName().toString();
@@ -1032,7 +1037,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     long inMemToDiskBytes = 0;
     boolean mergePhaseFinished = false;
     if (inMemoryMapOutputs.size() > 0) {
-      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
+      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier();
       inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
                                                 memDiskSegments,
                                                 this.postMergeMemLimit);

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index f8c9553..6e6d967 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
@@ -170,7 +169,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
       InputAttemptIdentifier.SPILL_INFO info = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO
           .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
       srcAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(dmEvent.getTargetIndex()), dmEvent
+          new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent
               .getVersion(), pathComponent, false, info, spillEventId);
     } else {
       srcAttemptIdentifier =

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index dcfb274..8cba2a6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -71,7 +71,6 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -111,7 +110,7 @@ class ShuffleScheduler {
   //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is
   // enabled in source.
   @VisibleForTesting
-  final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap;
+  final Map<Integer, ShuffleEventInfo> pipelinedShuffleInfoEventsMap;
 
   @VisibleForTesting
   final Set<MapHost> pendingHosts = new HashSet<MapHost>();
@@ -349,7 +348,7 @@ class ShuffleScheduler {
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
     this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
 
-    pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>();
+    pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>();
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
         + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@ -429,7 +428,7 @@ class ShuffleScheduler {
 
 
     ShuffleEventInfo(InputAttemptIdentifier input) {
-      this.id = input.getInputIdentifier().getInputIndex() + "_" + input.getAttemptNumber();
+      this.id = input.getInputIdentifier() + "_" + input.getAttemptNumber();
       this.eventsProcessed = new BitSet();
       this.attemptNum = input.getAttemptNumber();
     }
@@ -467,7 +466,7 @@ class ShuffleScheduler {
                                          ) throws IOException {
 
     inputContext.notifyProgress();
-    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
+    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier())) {
       if (!isLocalFetch) {
         /**
          * Reset it only when it is a non-local-disk copy.
@@ -505,10 +504,10 @@ class ShuffleScheduler {
        */
       if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
         remainingMaps.decrementAndGet();
-        setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
+        setInputFinished(srcAttemptIdentifier.getInputIdentifier());
         numFetchedSpills++;
       } else {
-        InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+        int inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
         //Allow only one task attempt to proceed.
         if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
           return;
@@ -533,7 +532,7 @@ class ShuffleScheduler {
         //check if we downloaded all spills pertaining to this InputAttemptIdentifier
         if (eventInfo.isDone()) {
           remainingMaps.decrementAndGet();
-          setInputFinished(inputIdentifier.getInputIndex());
+          setInputFinished(inputIdentifier);
           pipelinedShuffleInfoEventsMap.remove(inputIdentifier);
           if (LOG.isTraceEnabled()) {
             LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " +
@@ -560,7 +559,7 @@ class ShuffleScheduler {
       if (LOG.isDebugEnabled()) {
         LOG.debug("src task: "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier(),
                 srcAttemptIdentifier.getAttemptNumber()) + " done");
       }
     } else {
@@ -679,7 +678,7 @@ class ShuffleScheduler {
       String errorMsg = "Failed " + attemptFailures + " times trying to "
           + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier(
           inputContext.getSourceVertexName(),
-          srcAttempt.getInputIdentifier().getInputIndex(),
+          srcAttempt.getInputIdentifier(),
           srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
       IOException ioe = new IOException(errorMsg);
       // Shuffle knows how to deal with failures post shutdown via the onFailure hook
@@ -738,15 +737,15 @@ class ShuffleScheduler {
         srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: "
             + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils
             .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
-                srcAttempt.getInputIdentifier().getInputIndex(),
+                srcAttempt.getInputIdentifier(),
                 srcAttempt.getAttemptNumber()) + " to AM.");
     List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
     failedEvents.add(InputReadErrorEvent.create(
         "Fetch failure for " + TezRuntimeUtils
             .getTaskAttemptIdentifier(inputContext.getSourceVertexName(),
-                srcAttempt.getInputIdentifier().getInputIndex(),
+                srcAttempt.getInputIdentifier(),
                 srcAttempt.getAttemptNumber()) + " to jobtracker.",
-        srcAttempt.getInputIdentifier().getInputIndex(),
+        srcAttempt.getInputIdentifier(),
         srcAttempt.getAttemptNumber()));
 
     inputContext.sendEvents(failedEvents);
@@ -1014,7 +1013,7 @@ class ShuffleScheduler {
   
   private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
     return (!obsoleteInputs.contains(id) && 
-             !isInputFinished(id.getInputIdentifier().getInputIndex()));
+             !isInputFinished(id.getInputIdentifier()));
   }
 
   public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
@@ -1029,7 +1028,7 @@ class ShuffleScheduler {
       // This may be removed after TEZ-914
       InputAttemptIdentifier id = listItr.next();
       if (inputShouldBeConsumed(id)) {
-        Integer inputNumber = Integer.valueOf(id.getInputIdentifier().getInputIndex());
+        Integer inputNumber = Integer.valueOf(id.getInputIdentifier());
         List<InputAttemptIdentifier> oldIdList = dedupedList.get(inputNumber);
 
         if (oldIdList == null || oldIdList.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 20f44dd..a99eb5e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -496,7 +496,7 @@ public class IFile {
 
     protected byte[] buffer = null;
     protected int bufferSize = DEFAULT_BUFFER_SIZE;
-    protected DataInputStream dataIn;
+    protected DataInputStream dataIn = null;
 
     protected int recNo = 1;
     protected int originalKeyLength;
@@ -583,7 +583,9 @@ public class IFile {
         this.in = null;
       }
 
-      this.dataIn = new DataInputStream(this.in);
+      if (in != null) {
+        this.dataIn = new DataInputStream(this.in);
+      }
       this.readRecordsCounter = readsCounter;
       this.bytesReadCounter = bytesReadCounter;
       this.fileLength = length;

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 917dbcb..0aa112e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.junit.Assert;
 import org.junit.Test;
@@ -236,36 +235,36 @@ public class TestFetcher {
   @Test(timeout=5000)
   public void testInputAttemptIdentifierMap() {
     InputAttemptIdentifier[] srcAttempts = {
-        new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
             //duplicate entry
-        new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
         // pipeline shuffle based identifiers, with multiple attempts
-        new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+        new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-        new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-        new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+        new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
-        new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+        new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
             false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
-        new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+        new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
             false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
     };
     InputAttemptIdentifier[] expectedSrcAttempts = {
-        new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
         // pipeline shuffle based identifiers
-        new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+        new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-        new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-        new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+        new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
             false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
-        new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+        new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
             false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
-        new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+        new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
             false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
     };
     TezConfiguration conf = new TezConfiguration();

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index c452898..5bbf0fb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -53,7 +53,6 @@ import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
@@ -215,7 +214,7 @@ public class TestShuffleInputEventHandlerImpl {
     Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
     handler.handleEvents(Collections.singletonList(dme));
 
-    InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+    InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(1, 0,
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
     verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0));
 
@@ -223,7 +222,7 @@ public class TestShuffleInputEventHandlerImpl {
     dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
     handler.handleEvents(Collections.singletonList(dme));
 
-    InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+    InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0,
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
     verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0));
 
@@ -252,7 +251,7 @@ public class TestShuffleInputEventHandlerImpl {
     Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
     handler.handleEvents(Collections.singletonList(dme));
 
-    InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 1,
+    InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1,
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
     verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0));
 
@@ -283,14 +282,14 @@ public class TestShuffleInputEventHandlerImpl {
     Event dme = createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0);
     handler.handleEvents(Collections.singletonList(dme));
 
-    InputAttemptIdentifier expected = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+    InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 0,
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
     verify(shuffleManager, times(1)).addCompletedInputWithNoData(expected);
 
     //0--> 1 with spill id 1 (attemptNum 0)
     handler.handleEvents(Collections.singletonList(dme));
     dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
-    expected = new InputAttemptIdentifier(new InputIdentifier(1), 0,
+    expected = new InputAttemptIdentifier(1, 0,
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
     verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index faa2d31..20fb9a9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -53,7 +53,6 @@ import com.google.common.collect.Lists;
 import org.apache.tez.http.HttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -116,7 +115,7 @@ public class TestFetcher {
     doReturn("src vertex").when(inputContext).getSourceVertexName();
 
     MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
-    InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt");
+    InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
     mapHost.addKnownMap(inputAttemptIdentifier);
     List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
     doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
@@ -484,36 +483,36 @@ public class TestFetcher {
   @Test(timeout = 5000)
   public void testInputAttemptIdentifierMap() {
     InputAttemptIdentifier[] srcAttempts = {
-      new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+      new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
           //duplicate entry
-      new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+      new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
       // pipeline shuffle based identifiers, with multiple attempts
-      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-      new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+      new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
-      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
           false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
-      new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+      new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
           false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
     };
     InputAttemptIdentifier[] expectedSrcAttempts = {
-      new InputAttemptIdentifier(new InputIdentifier(0), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
+      new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
       // pipeline shuffle based identifiers
-      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-      new InputAttemptIdentifier(new InputIdentifier(1), 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
+      new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
-      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
+      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
           false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
-      new InputAttemptIdentifier(new InputIdentifier(1), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
           false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
-      new InputAttemptIdentifier(new InputIdentifier(2), 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
+      new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
           false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
     };
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 88a1d20..de066fe 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -17,7 +17,6 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.Before;
 import org.junit.Test;
@@ -165,7 +164,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     int inputIdx = 0;
     Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0);
     InputAttemptIdentifier id1 =
-        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+        new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
     handler.handleEvents(Collections.singletonList(dme1));
     String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
@@ -176,7 +175,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     //Send final_update event.
     Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1);
     InputAttemptIdentifier id2 =
-        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+        new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
     handler.handleEvents(Collections.singletonList(dme2));
     baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
@@ -202,14 +201,14 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     inputIdx = 1;
     Event dme3 = createDataMovementEvent(attemptNum, inputIdx, null, false, true,
         true, 1);
-    InputAttemptIdentifier id3 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
+    InputAttemptIdentifier id3 = new InputAttemptIdentifier(inputIdx,
         attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
         0);
     handler.handleEvents(Collections.singletonList(dme3));
     //Send final_update event (empty partition directly invoking copySucceeded).
-    InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
+    InputAttemptIdentifier id4 = new InputAttemptIdentifier(inputIdx,
         attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
-    assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex()));
+    assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier()));
     scheduler.copySucceeded(id4, null, 0, 0, 0, null, false);
     assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet
     //Let the incremental event pass
@@ -229,7 +228,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(Collections.singletonList(dme1));
 
     InputAttemptIdentifier id1 =
-        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+        new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
 
     verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1));
@@ -243,7 +242,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(Collections.singletonList(dme2));
 
     InputAttemptIdentifier id2 =
-        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+        new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
     verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class));
   }
@@ -329,4 +328,4 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     }
     return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/3ff360aa/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 1a6c3be..f7ef309 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -50,7 +50,6 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -89,7 +88,7 @@ public class TestShuffleScheduler {
       // Schedule all copies.
       for (int i = 0; i < numInputs; i++) {
         InputAttemptIdentifier inputAttemptIdentifier =
-            new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+            new InputAttemptIdentifier(i, 0, "attempt_");
         scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
@@ -134,7 +133,7 @@ public class TestShuffleScheduler {
 
       for (int i = 0; i < numInputs; i++) {
         InputAttemptIdentifier inputAttemptIdentifier =
-            new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+            new InputAttemptIdentifier(i, 0, "attempt_");
         scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }
@@ -191,7 +190,7 @@ public class TestShuffleScheduler {
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, "hostUrl", inputAttemptIdentifier);
     }
@@ -199,7 +198,7 @@ public class TestShuffleScheduler {
     //100 succeeds
     for (int i = 0; i < 100; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
@@ -210,14 +209,14 @@ public class TestShuffleScheduler {
     //99 fails
     for (int i = 100; i < 199; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
           + ":" + 10000, ""), false, true, false);
     }
 
 
     InputAttemptIdentifier inputAttemptIdentifier =
-        new InputAttemptIdentifier(new InputIdentifier(200), 0, "attempt_");
+        new InputAttemptIdentifier(200, 0, "attempt_");
 
     //Should fail here and report exception as reducer is not healthy
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 %
@@ -260,7 +259,7 @@ public class TestShuffleScheduler {
     //Generate 0-200 events
     for (int i = 0; i < 200; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, "hostUrl", inputAttemptIdentifier);
     }
@@ -269,7 +268,7 @@ public class TestShuffleScheduler {
     //Generate 200-320 events with empty partitions
     for (int i = 200; i < 320; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copySucceeded(inputAttemptIdentifier, null, 0, 0, 0, null, true);
     }
     //120 are successful. so remaining is 200
@@ -279,7 +278,7 @@ public class TestShuffleScheduler {
     //200 pending to be downloaded. Download 190.
     for (int i = 0; i < 190; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
@@ -292,7 +291,7 @@ public class TestShuffleScheduler {
     //10 fails
     for (int i = 190; i < 200; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
           + ":" + 10000, ""), false, true, false);
     }
@@ -304,7 +303,7 @@ public class TestShuffleScheduler {
     scheduler.lastProgressTime = System.currentTimeMillis() - 250000;
 
     InputAttemptIdentifier inputAttemptIdentifier =
-        new InputAttemptIdentifier(new InputIdentifier(190), 0, "attempt_");
+        new InputAttemptIdentifier(190, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" +
         (190 % totalProducerNodes)
         + ":" + 10000, ""), false, true, false);
@@ -317,7 +316,7 @@ public class TestShuffleScheduler {
     //fail to download 50 more times across attempts
     for (int i = 190; i < 200; i++) {
       inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
           + ":" + 10000, ""), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
@@ -338,7 +337,7 @@ public class TestShuffleScheduler {
     //fail another 30
     for (int i = 110; i < 120; i++) {
       inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
           + ":" + 10000, ""), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
@@ -376,7 +375,7 @@ public class TestShuffleScheduler {
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, "hostUrl", inputAttemptIdentifier);
     }
@@ -384,7 +383,7 @@ public class TestShuffleScheduler {
     //319 succeeds
     for (int i = 0; i < 319; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
@@ -394,7 +393,7 @@ public class TestShuffleScheduler {
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
-        new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+        new InputAttemptIdentifier(319, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
         + ":" + 10000, ""), false, true, false);
 
@@ -441,7 +440,7 @@ public class TestShuffleScheduler {
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, "hostUrl", inputAttemptIdentifier);
     }
@@ -449,7 +448,7 @@ public class TestShuffleScheduler {
     //Tasks fail in 20% of nodes 3 times, but are able to proceed further
     for (int i = 0; i < 64; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
 
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i %
           totalProducerNodes) + ":" + 10000, ""), false, true, false);
@@ -470,7 +469,7 @@ public class TestShuffleScheduler {
       //319 succeeds
     for (int i = 64; i < 319; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
@@ -480,7 +479,7 @@ public class TestShuffleScheduler {
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
-        new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_");
+        new InputAttemptIdentifier(319, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes)
         + ":" + 10000, ""), false, true, false);
 
@@ -536,7 +535,7 @@ public class TestShuffleScheduler {
     //Generate 319 events (last event has not arrived)
     for (int i = 0; i < 319; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, "hostUrl", inputAttemptIdentifier);
     }
@@ -544,7 +543,7 @@ public class TestShuffleScheduler {
     //318 succeeds
     for (int i = 0; i < 319; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
@@ -554,7 +553,7 @@ public class TestShuffleScheduler {
 
     //1 fails (last fetch)
     InputAttemptIdentifier inputAttemptIdentifier =
-        new InputAttemptIdentifier(new InputIdentifier(318), 0, "attempt_");
+        new InputAttemptIdentifier(318, 0, "attempt_");
     scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes)
         + ":" + 10000, ""), false, true, false);
 
@@ -615,7 +614,7 @@ public class TestShuffleScheduler {
     //Generate 320 events (last event has not arrived)
     for (int i = 0; i < 320; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes),
           10000, i, "hostUrl", inputAttemptIdentifier);
     }
@@ -623,7 +622,7 @@ public class TestShuffleScheduler {
     //10 succeeds
     for (int i = 0; i < 10; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
@@ -634,7 +633,7 @@ public class TestShuffleScheduler {
     //5 fetches fail once
     for (int i = 10; i < 15; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
           + ":" + 10000, ""), false, true, false);
     }
@@ -648,7 +647,7 @@ public class TestShuffleScheduler {
     //5 fetches fail repeatedly
     for (int i = 10; i < 15; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
           + ":" + 10000, ""), false, true, false);
       scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes)
@@ -691,7 +690,7 @@ public class TestShuffleScheduler {
     //Generate 320 events
     for (int i = 0; i < 320; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i,
           "hostUrl", inputAttemptIdentifier);
     }
@@ -699,7 +698,7 @@ public class TestShuffleScheduler {
     //100 succeeds
     for (int i = 0; i < 100; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       MapOutput mapOutput = MapOutput
           .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class),
               100, false);
@@ -711,7 +710,7 @@ public class TestShuffleScheduler {
     //99 fails
     for (int i = 100; i < 199; i++) {
       InputAttemptIdentifier inputAttemptIdentifier =
-          new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+          new InputAttemptIdentifier(i, 0, "attempt_");
       scheduler.copyFailed(inputAttemptIdentifier,
           new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""),
           false, true, false);
@@ -754,7 +753,7 @@ public class TestShuffleScheduler {
     final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle);
 
     InputAttemptIdentifier inputAttemptIdentifier =
-        new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt_");
+        new InputAttemptIdentifier(0, 0, "attempt_");
     scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier);
 
     assertTrue(scheduler.pendingHosts.size() == 1);
@@ -801,7 +800,7 @@ public class TestShuffleScheduler {
 
       for (int i = 0; i < numInputs; i++) {
         InputAttemptIdentifier inputAttemptIdentifier =
-            new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_");
+            new InputAttemptIdentifier(i, 0, "attempt_");
         scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier);
         identifiers[i] = inputAttemptIdentifier;
       }