You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/12 02:49:59 UTC

[2/5] tez git commit: TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts instead of tasks (bikas) (cherry picked from commit 7b45e9a142830e7dd8b0263d50dbaaef5fb0da76)

http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index b164a6d..d59439e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.*;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -34,9 +33,10 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,13 +44,13 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
 
 @SuppressWarnings("unchecked")
 public class TestInputReadyVertexManager {
   
   @Captor
-  ArgumentCaptor<List<TaskWithLocationHint>> requestCaptor;
+  ArgumentCaptor<List<ScheduleTaskRequest>> requestCaptor;
   
   @Before
   public void init() {
@@ -77,23 +77,23 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
-    
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    
+
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned();
     // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex();
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
     // then own vertex started
-    manager.onVertexStarted(initialCompletions);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
-    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
-    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onVertexStarted(Collections.singletonList(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    verify(mockContext, times(0)).scheduleTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(2, requestCaptor.getValue().size());
   }
   
@@ -118,36 +118,36 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
     
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned();
     // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex();
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
-    manager.onVertexStarted(initialCompletions);
-    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
+    manager.onVertexStarted(Collections.singletonList(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+    verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(0, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
-    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
@@ -175,28 +175,28 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
     
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned();
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
     // ok to have source task complete before anything else
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     // first own vertex started
-    manager.onVertexStarted(initialCompletions);
+    manager.onVertexStarted(Collections.singletonList(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
     // no scheduling as we are not configured yet
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
     // then source vertex configured. now we start
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex();
     
-    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
@@ -247,7 +247,7 @@ public class TestInputReadyVertexManager {
     mockInputVertices.put(mockSrcVertexId2, eProp2);
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+    List<TaskAttemptIdentifier> initialCompletions = Lists.newArrayList();
     
     // 1-1 sources do not match managed tasks. setParallelism called to make them match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
@@ -280,8 +280,8 @@ public class TestInputReadyVertexManager {
     
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
+    initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
@@ -293,44 +293,53 @@ public class TestInputReadyVertexManager {
     verify(mockContext, times(2)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1); // duplicate
-    manager.onSourceTaskCompleted(mockSrcVertexId2, 1);
-    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2); // v1 done
-    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // duplicate
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+    verify(mockContext, times(0)).scheduleTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); // v1 done
+    verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(0, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     // 1-1 completion triggers since other 1-1 is done
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 1);
-    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 1));
+    verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     // 1-1 completion does not trigger since other 1-1 is not done
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
-    verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2));
+    verify(mockContext, times(2)).scheduleTasks(anyList());
     // 1-1 completion trigger start
-    manager.onSourceTaskCompleted(mockSrcVertexId2, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
+    verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     
     // no more starts
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2));
+    verify(mockContext, times(3)).scheduleTasks(anyList());
     
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index d71eba2..df08060 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -34,10 +34,18 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
@@ -68,6 +76,10 @@ import static org.mockito.Mockito.when;
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestShuffleVertexManager {
 
+  TezVertexID vertexId = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+  int taskId = 0;
+  List<TaskAttemptIdentifier> emptyCompletions = null;
+
   @Test(timeout = 5000)
   public void testShuffleVertexManagerAutoParallelism() throws Exception {
     Configuration conf = new Configuration();
@@ -156,12 +168,12 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
           scheduledTasks.clear();
-          List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-          for (TaskWithLocationHint task : tasks) {
+          List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+          for (ScheduleTaskRequest task : tasks) {
             scheduledTasks.add(task.getTaskIndex());
           }
           return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
     
     final Map<String, EdgeManagerPlugin> newEdgeManagers =
         new HashMap<String, EdgeManagerPlugin>();
@@ -217,7 +229,7 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
 
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     verify(mockContext, times(2)).vertexReconfigurationPlanned();
     Assert.assertTrue(manager.bipartiteSources == 2);
     
@@ -240,7 +252,7 @@ public class TestShuffleVertexManager {
     verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start
     Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
     // trigger start and processing of pending notification events
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -250,20 +262,18 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
 
-    ByteBuffer payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer();
-    VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
     // parallelism not change due to large data size
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
     verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     manager.onVertexManagerEventReceived(vmEvent);
 
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
@@ -281,20 +291,18 @@ public class TestShuffleVertexManager {
      * Delay determining parallelism until enough data has been received.
      */
     scheduledTasks.clear();
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
 
     //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //First task in src1 completed with small payload
+    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent); //small payload
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.determineParallelismAndApply() == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
@@ -303,8 +311,9 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
 
     //Second task in src1 completed with small payload
+    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent); //small payload
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
     Assert.assertTrue(manager.determineParallelismAndApply() == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
@@ -314,17 +323,14 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
 
     //First task in src2 completed (with larger payload) to trigger determining parallelism
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString()
-            .asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
+    vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
     verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertEquals(1, manager.pendingTasks.size());
     Assert.assertEquals(1, scheduledTasks.size());
     Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
@@ -336,14 +342,11 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
     scheduledTasks.clear();
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString()
-            .asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
+    vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
 
     //min/max fraction of 0.0/0.2
     manager = createManager(conf, mockContext, 0.0f, 0.2f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -353,13 +356,17 @@ public class TestShuffleVertexManager {
     //send 7 events with payload size as 100
     for(int i=0;i<7;i++) {
       manager.onVertexManagerEventReceived(vmEvent); //small payload
-      manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i));
+      manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
       //should not change parallelism
       verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
     }
     //send 8th event with payload size as 100
     manager.onVertexManagerEventReceived(vmEvent);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8));
+
+    //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
     //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
     verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
 
@@ -370,38 +377,39 @@ public class TestShuffleVertexManager {
 
     // parallelism changed due to small data size
     scheduledTasks.clear();
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
 
     manager = createManager(conf, mockContext, 0.5f, 0.5f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
     // ignore duplicate completion
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
-    
+    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     // managedVertex tasks reduced
     verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
@@ -415,7 +423,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize);
     
     // more completions dont cause recalculation of parallelism
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
@@ -490,7 +498,7 @@ public class TestShuffleVertexManager {
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     try {
       manager = createManager(conf, mockContext, 0.1f, 0.1f);
-      manager.onVertexStarted(null);
+      manager.onVertexStarted(emptyCompletions);
       Assert.assertFalse(true);
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -502,7 +510,7 @@ public class TestShuffleVertexManager {
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
 
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
@@ -510,17 +518,17 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
           scheduledTasks.clear();
-          List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-          for (TaskWithLocationHint task : tasks) {
+          List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+          for (ScheduleTaskRequest task : tasks) {
             scheduledTasks.add(task.getTaskIndex());
           }
           return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
     
     // source vertices have 0 tasks. immediate start of all managed tasks
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     
@@ -560,7 +568,7 @@ public class TestShuffleVertexManager {
     scheduledTasks.clear();
 
     manager = createManager(conf, mockContext, 0.8f, null);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -573,7 +581,7 @@ public class TestShuffleVertexManager {
     // Finish all tasks before exceeding the threshold
     for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
       for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
-        manager.onSourceTaskCompleted(mockSrcVertex, i);
+        manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
         ++completedTasks;
         if ((completedTasks + 1) >= completedTasksThreshold) {
           // stop before completing more than min/max source tasks
@@ -586,7 +594,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
 
     // Cross the threshold min/max threshold to schedule all tasks
-    manager.onSourceTaskCompleted(mockSrcVertexId2, completedTasks);
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
     Assert.assertEquals(0, manager.pendingTasks.size());
     Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
 
@@ -595,8 +603,8 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
 
     // source vertex have some tasks. min, max == 0
-    manager = createManager(conf, mockContext, 0.0f, 0.0f);
-    manager.onVertexStarted(null);
+    manager = createManager(conf, mockContext, 0.f, 0.f);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
@@ -609,122 +617,122 @@ public class TestShuffleVertexManager {
     
     // min, max > 0 and min == max
     manager = createManager(conf, mockContext, 0.25f, 0.25f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
     
     // min, max > 0 and min == max == absolute max 1.0
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     
     // min, max > 0 and min == max
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 0.75f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     // completion of same task again should not get counted
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
     scheduledTasks.clear();
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); // we are done. no action
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
 
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 1.0f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 1);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
@@ -734,9 +742,10 @@ public class TestShuffleVertexManager {
 
   /**
    * Tasks should be scheduled only when all source vertices are configured completely
+   * @throws IOException 
    */
   @Test(timeout = 5000)
-  public void test_Tez1649_with_scatter_gather_edges() {
+  public void test_Tez1649_with_scatter_gather_edges() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(
         ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
@@ -780,10 +789,7 @@ public class TestShuffleVertexManager {
     when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
     when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
 
-    ByteBuffer payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer();
-    VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
-
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
     // check initialization
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
 
@@ -792,14 +798,14 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
         scheduledTasks.clear();
-        List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-        for (TaskWithLocationHint task : tasks) {
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
           scheduledTasks.add(task.getTaskIndex());
         }
         return null;
-      }}).when(mockContext_R2).scheduleVertexTasks(anyList());
+      }}).when(mockContext_R2).scheduleTasks(anyList());
 
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 3);
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
@@ -813,15 +819,15 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
     //Send events for all tasks of m3.
-    manager.onSourceTaskCompleted(m3, new Integer(0));
-    manager.onSourceTaskCompleted(m3, new Integer(1));
-    manager.onSourceTaskCompleted(m3, new Integer(2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
 
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
     //Send an event for m2. But still we need to wait for at least 1 event from r1.
-    manager.onSourceTaskCompleted(m2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
@@ -846,7 +852,7 @@ public class TestShuffleVertexManager {
     when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
 
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
@@ -856,11 +862,42 @@ public class TestShuffleVertexManager {
 
     // Only need completed configuration notification from m3
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(m3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
   }
 
+  VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName)
+      throws IOException {
+    ByteBuffer payload = null;
+    if (sizes != null) {
+      /*
+      RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
+      DataOutputBuffer dout = new DataOutputBuffer();
+      partitionStats.serialize(dout);
+      ByteString
+          partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+      payload =
+          VertexManagerEventPayloadProto.newBuilder()
+              .setOutputSize(totalSize)
+              .setPartitionStats(partitionStatsBytes)
+              .build().toByteString()
+              .asReadOnlyByteBuffer();
+              */
+    } else {
+      payload =
+          VertexManagerEventPayloadProto.newBuilder()
+              .setOutputSize(totalSize)
+              .build().toByteString()
+              .asReadOnlyByteBuffer();
+    }
+    TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag", vertexName,
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, taskId++), 0));
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload);
+    vmEvent.setProducerAttemptIdentifier(taId);
+    return vmEvent;
+  }
+
   @Test(timeout = 5000)
   public void test_Tez1649_with_mixed_edges() {
     Configuration conf = new Configuration();
@@ -913,16 +950,16 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
         scheduledTasks.clear();
-        List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-        for (TaskWithLocationHint task : tasks) {
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
           scheduledTasks.add(task.getTaskIndex());
         }
         return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
 
     // check initialization
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 1);
 
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
@@ -933,13 +970,13 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //Send events for 2 tasks of r1.
-    manager.onSourceTaskCompleted(r1, new Integer(0));
-    manager.onSourceTaskCompleted(r1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
     //Send an event for m2.
-    manager.onSourceTaskCompleted(m2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
@@ -952,7 +989,7 @@ public class TestShuffleVertexManager {
     //Still, wait for a configuration to be completed from other edges
     scheduledTasks.clear();
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
 
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
@@ -964,9 +1001,9 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
-    manager.onSourceTaskCompleted(r1, new Integer(0));
-    manager.onSourceTaskCompleted(r1, new Integer(1));
-    manager.onSourceTaskCompleted(r1, new Integer(2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 2));
     //Tasks from non-scatter edges of m2 and m3 are not complete.
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
@@ -979,7 +1016,7 @@ public class TestShuffleVertexManager {
     //try with a zero task vertex (with non-scatter-gather edges)
     scheduledTasks.clear();
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -988,7 +1025,7 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(m3)).thenReturn(3); //broadcast
 
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
 
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
@@ -996,8 +1033,8 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //Send 2 events for tasks of r1.
-    manager.onSourceTaskCompleted(r1, new Integer(0));
-    manager.onSourceTaskCompleted(r1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 0);
 
@@ -1009,7 +1046,7 @@ public class TestShuffleVertexManager {
     //try with all zero task vertices in non-SG edges
     scheduledTasks.clear();
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -1019,10 +1056,22 @@ public class TestShuffleVertexManager {
 
     //Send 1 events for tasks of r1.
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(r1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
   }
+  
+  public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
+    VertexIdentifier mockVertex = mock(VertexIdentifier.class);
+    when(mockVertex.getName()).thenReturn(vName);
+    TaskIdentifier mockTask = mock(TaskIdentifier.class);
+    when(mockTask.getIdentifier()).thenReturn(tId);
+    when(mockTask.getVertexIdentifier()).thenReturn(mockVertex);
+    TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class);
+    when(mockAttempt.getIdentifier()).thenReturn(0);
+    when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask);
+    return mockAttempt;
+  }
 
   private ShuffleVertexManager createManager(Configuration conf,
       VertexManagerPluginContext context, Float min, Float max) {

http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 7d7069e..04b0a03 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import org.slf4j.Logger;
@@ -56,7 +55,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -71,6 +70,7 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 import org.junit.After;
@@ -523,8 +523,8 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+      super.onSourceTaskCompleted(attempt);
       completedTaskNum ++;
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -532,7 +532,8 @@ public class TestAMRecovery {
             System.exit(-1);
           }
         } else {
-          if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+          if (completedTaskNum == getContext().
+              getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
             System.exit(-1);
           }
         }
@@ -562,8 +563,8 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+      super.onSourceTaskCompleted(attempt);
       completedTaskNum ++;
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -571,7 +572,8 @@ public class TestAMRecovery {
             System.exit(-1);
           }
         } else {
-          if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+          if (completedTaskNum == getContext().
+              getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
             System.exit(-1);
           }
         }
@@ -602,8 +604,8 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+      super.onSourceTaskCompleted(attempt);
       completedTaskNum ++;
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -611,7 +613,8 @@ public class TestAMRecovery {
             System.exit(-1);
           }
         } else {
-          if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+          if (completedTaskNum == getContext().
+              getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
             System.exit(-1);
           }
         }
@@ -643,26 +646,26 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions)
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions)
         throws Exception {
       if (getContext().getDAGAttemptNumber() == 1) {
         // only schedule one task if it is partiallyFinished case
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
-          getContext().scheduleVertexTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+          getContext().scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
           return ;
         }
       }
       // schedule all tasks when it is not partiallyFinished
       int taskNum = getContext().getVertexNumTasks(getContext().getVertexName());
-      List<TaskWithLocationHint> taskWithLocationHints = new ArrayList<TaskWithLocationHint>();
+      List<ScheduleTaskRequest> taskWithLocationHints = new ArrayList<ScheduleTaskRequest>();
       for (int i=0;i<taskNum;++i) {
-        taskWithLocationHints.add(new TaskWithLocationHint(i, null));
+        taskWithLocationHints.add(ScheduleTaskRequest.create(i, null));
       }
-      getContext().scheduleVertexTasks(taskWithLocationHints);
+      getContext().scheduleTasks(taskWithLocationHints);
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId)
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt)
         throws Exception {
       
     }
@@ -704,9 +707,9 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
       int curAttempt = getContext().getDAGAttemptNumber();
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+      super.onSourceTaskCompleted(attempt);
       int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1);
       LOG.info("failOnAttempt:" + failOnAttempt);
       LOG.info("curAttempt:" + curAttempt);

http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index caf0822..7d88fdf 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
@@ -696,7 +697,7 @@ public class TestExceptionPropagation {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
       if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) {
         throw new RuntimeException(this.exLocation.name());
       }
@@ -739,11 +740,11 @@ public class TestExceptionPropagation {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
       if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) {
         throw new RuntimeException(this.exLocation.name());
       }
-      super.onSourceTaskCompleted(srcVertexName, attemptId);
+      super.onSourceTaskCompleted(attempt);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 7244d8d..5c6f855 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -40,7 +40,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.InputInitializer;
@@ -57,15 +58,12 @@ import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
 import org.apache.tez.test.TestProcessor;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MultiAttemptDAG {
@@ -102,14 +100,11 @@ public class MultiAttemptDAG {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
       if (completions != null) {
-        for (Entry<String, List<Integer>> entry : completions.entrySet()) {
-          LOG.info("Received completion events on vertexStarted"
-              + ", vertex=" + entry.getKey()
-              + ", completions=" + entry.getValue().size());
-          numCompletions.addAndGet(entry.getValue().size());
-        }
+        LOG.info("Received completion events on vertexStarted"
+            + ", completions=" + completions.size());
+        numCompletions.addAndGet(completions.size());
       }
       maybeScheduleTasks();
     }
@@ -129,20 +124,20 @@ public class MultiAttemptDAG {
         } else if (successAttemptId == getContext().getDAGAttemptNumber()) {
           LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName());
           int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
-          List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
+          List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
           for (int i=0; i<numTasks; ++i) {
-            scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
+            scheduledTasks.add(ScheduleTaskRequest.create(i, null));
           }
-          getContext().scheduleVertexTasks(scheduledTasks);
+          getContext().scheduleTasks(scheduledTasks);
         }
       }
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
       LOG.info("Received completion events for source task"
-          + ", vertex=" + srcVertexName
-          + ", taskIdx=" + taskId);
+          + ", vertex=" + attempt.getTaskIdentifier().getVertexIdentifier().getName()
+          + ", taskIdx=" + attempt.getTaskIdentifier().getIdentifier());
       numCompletions.incrementAndGet();
       maybeScheduleTasks();
     }