You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/05 20:06:17 UTC

[2/2] tez git commit: TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts instead of tasks (bikas)

TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts instead of tasks (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7b45e9a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7b45e9a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7b45e9a1

Branch: refs/heads/master
Commit: 7b45e9a142830e7dd8b0263d50dbaaef5fb0da76
Parents: cc1d89c
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Aug 5 11:05:54 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Aug 5 11:05:54 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/api/VertexManagerPlugin.java |  45 ++-
 .../tez/dag/api/VertexManagerPluginContext.java |  37 ++-
 .../apache/tez/runtime/api/DagIdentifier.java   |  26 ++
 .../tez/runtime/api/TaskAttemptIdentifier.java  |  26 ++
 .../apache/tez/runtime/api/TaskIdentifier.java  |  26 ++
 .../tez/runtime/api/VertexIdentifier.java       |  28 ++
 .../runtime/api/events/VertexManagerEvent.java  |  22 +-
 .../tez/dag/records/DagIdentifierImpl.java      |  69 +++++
 .../dag/records/TaskAttemptIdentifierImpl.java  |  70 +++++
 .../tez/dag/records/TaskIdentifierImpl.java     |  70 +++++
 .../tez/dag/records/VertexIdentifierImpl.java   |  77 ++++++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   4 +-
 .../dag/impl/ImmediateStartVertexManager.java   |  13 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  51 +++-
 .../tez/dag/app/dag/impl/VertexManager.java     |  65 ++---
 .../apache/tez/dag/app/dag/impl/TestCommit.java |   5 +-
 .../impl/TestImmediateStartVertexManager.java   |  20 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  50 ++--
 .../tez/dag/app/dag/impl/TestVertexManager.java |   5 +-
 .../tez/test/VertexManagerPluginForTest.java    |   6 +-
 .../vertexmanager/InputReadyVertexManager.java  |  36 ++-
 .../vertexmanager/ShuffleVertexManager.java     |  35 ++-
 .../TestInputReadyVertexManager.java            | 127 +++++----
 .../vertexmanager/TestShuffleVertexManager.java | 272 ++++++++++---------
 .../org/apache/tez/test/TestAMRecovery.java     |  41 +--
 .../tez/test/TestExceptionPropagation.java      |   7 +-
 .../apache/tez/test/dag/MultiAttemptDAG.java    |  29 +-
 28 files changed, 906 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1a4e31..708dee5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
   TEZ-2646. Add scheduling casual dependency for attempts
   TEZ-2647. Add input causality dependency for attempts
+  TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
+  instead of tasks
 
 ALL CHANGES:
   TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
index 6aa18d6..b66a66a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.api;
 
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -25,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 /**
@@ -59,20 +62,58 @@ public abstract class VertexManagerPlugin {
    */
   public abstract void initialize() throws Exception;
 
+  @Deprecated
   /**
+   * This is replaced by {@link VertexManagerPlugin#onVertexStarted(List)}
    * Notification that the vertex is ready to start running tasks
    * @param completions Source vertices and all their tasks that have already completed
    * @throws Exception
    */
-  public abstract void onVertexStarted(Map<String, List<Integer>> completions) throws Exception;
+  public void onVertexStarted(Map<String, List<Integer>> completions) throws Exception {
+    throw new UnsupportedOperationException();
+  }
 
   /**
+   * Notification that the vertex is ready to start running tasks
+   * @param completions All the source task attempts that have already completed
+   * @throws Exception
+   */
+  public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception {
+    Map<String, List<Integer>> completionsMap = new HashMap<String, List<Integer>>();
+    for (TaskAttemptIdentifier attempt : completions) {
+      String vName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+      List<Integer> tasks = completionsMap.get(vName);
+      if (tasks == null) {
+        tasks = new LinkedList<Integer>();
+        completionsMap.put(vName, tasks);
+      }
+      tasks.add(attempt.getTaskIdentifier().getIdentifier());
+    }
+    onVertexStarted(completionsMap);
+  }
+  
+  @Deprecated
+  /**
+   * This has been replaced by 
+   * {@link VertexManagerPlugin#onSourceTaskCompleted(TaskAttemptIdentifier)}
    * Notification of a source vertex completion.
    * @param srcVertexName
    * @param taskId Index of the task that completed
    * @throws Exception
    */
-  public abstract void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception;
+  public void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Notification of a source vertex task completion.
+   * @param attempt Identifier of the task attempt that completed
+   * @throws Exception
+   */
+  public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception {
+    onSourceTaskCompleted(attempt.getTaskIdentifier().getVertexIdentifier().getName(), 
+        attempt.getTaskIdentifier().getIdentifier());
+  }
 
   /**
    * Notification of an event directly sent to this vertex manager

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 1b012ae..883387b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -27,7 +27,6 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.runtime.api.InputSpecUpdate;
@@ -43,11 +42,35 @@ import com.google.common.base.Preconditions;
 @Public
 public interface VertexManagerPluginContext {
   
+  public class ScheduleTaskRequest {
+    int taskIndex;
+    TaskLocationHint locationHint;
+
+    public static ScheduleTaskRequest create(int taskIndex, @Nullable TaskLocationHint locationHint) {
+      return new ScheduleTaskRequest(taskIndex, locationHint); 
+    }
+    
+    private ScheduleTaskRequest(int taskIndex, @Nullable TaskLocationHint locationHint) {
+      Preconditions.checkState(taskIndex >= 0);
+      this.taskIndex = taskIndex;
+      this.locationHint = locationHint;
+    }
+    
+    public int getTaskIndex() {
+      return taskIndex;
+    }
+    
+    public TaskLocationHint getTaskLocationHint() {
+      return locationHint;
+    }    
+  }
+  
+  @Deprecated
   public class TaskWithLocationHint {
     Integer taskIndex;
     TaskLocationHint locationHint;
     public TaskWithLocationHint(Integer taskIndex, @Nullable TaskLocationHint locationHint) {
-      Preconditions.checkNotNull(taskIndex);
+      Preconditions.checkState(taskIndex != null);
       this.taskIndex = taskIndex;
       this.locationHint = locationHint;
     }
@@ -152,7 +175,7 @@ public interface VertexManagerPluginContext {
    * destination tasks may need to be updated to account for the new task
    * parallelism. This method can be called to update the parallelism multiple
    * times until any of the tasks of the vertex have been scheduled (by invoking
-   * {@link #scheduleVertexTasks(List)}. If needed, the original source edge
+   * {@link #scheduleTasks(List)}. If needed, the original source edge
    * properties may be obtained via {@link #getInputVertexEdgeProperties()}
    * 
    * @param parallelism
@@ -209,13 +232,21 @@ public interface VertexManagerPluginContext {
    */
   public void addRootInputEvents(String inputName, Collection<InputDataInformationEvent> events);
   
+  @Deprecated
   /**
+   * Replaced by {@link #scheduleTasks(List)}
    * Notify the vertex to start the given tasks
    * @param tasks Indices of the tasks to be started
    */
   public void scheduleVertexTasks(List<TaskWithLocationHint> tasks);
   
   /**
+   * Notify the vertex to schedule the given tasks
+   * @param tasks Identifier and metadata for the tasks to schedule
+   */
+  public void scheduleTasks(List<ScheduleTaskRequest> tasks);
+
+  /**
    * Get the names of the non-vertex inputs of this vertex. These are primary 
    * sources of data.
    * @return Names of inputs to this vertex. Maybe null if there are no inputs

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
new file mode 100644
index 0000000..dd63b4c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface DagIdentifier {
+
+  public String getName();
+  
+  public int getIdentifier();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
new file mode 100644
index 0000000..101fa91
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface TaskAttemptIdentifier {
+  
+  public int getIdentifier();
+  
+  public TaskIdentifier getTaskIdentifier();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
new file mode 100644
index 0000000..8ef066b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface TaskIdentifier {
+  
+  public int getIdentifier();
+  
+  public VertexIdentifier getVertexIdentifier();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
new file mode 100644
index 0000000..16e88ad
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface VertexIdentifier {
+
+  public int getIdentifier();
+  
+  public String getName();
+  
+  public DagIdentifier getDagIdentifier();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
index 484087e..9e73fe5 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
@@ -20,16 +20,19 @@ package org.apache.tez.runtime.api.events;
 
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 
 import com.google.common.base.Preconditions;
 
 /**
  * Event used to send information from a Task to the VertexManager for a vertex.
  * This may be used to send statistics like samples etc to the VertexManager for
- * automatic plan recofigurations based on observed statistics
+ * automatic plan reconfigurations based on observed statistics
  */
 @Unstable
 @Public
@@ -40,6 +43,8 @@ public class VertexManagerEvent extends Event {
    */
   private final String targetVertexName;
   
+  private TaskAttemptIdentifier producerAttempt;
+  
   /**
    * User payload to be sent
    */
@@ -68,4 +73,19 @@ public class VertexManagerEvent extends Event {
   public ByteBuffer getUserPayload() {
     return userPayload == null ? null : userPayload.asReadOnlyBuffer();
   }
+  
+  /**
+   * Get metadata about the task attempt that produced the event.
+   * This method will provide a valid return value only when invoked in the 
+   * {@link VertexManagerPlugin}
+   * @return attempt metadata
+   */
+  public TaskAttemptIdentifier getProducerAttemptIdentifier() {
+    return producerAttempt;
+  }
+  
+  @Private
+  public void setProducerAttemptIdentifier(TaskAttemptIdentifier producerAttempt) {
+    this.producerAttempt = producerAttempt;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
new file mode 100644
index 0000000..099cb58
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.DagIdentifier;
+
+public class DagIdentifierImpl implements DagIdentifier {
+
+  private final TezDAGID dagId;
+  private final String dagName;
+  
+  public DagIdentifierImpl(String dagName, TezDAGID dagId) {
+    this.dagId = dagId;
+    this.dagName = dagName;
+  }
+
+  @Override
+  public String getName() {
+    return dagName;
+  }
+
+  @Override
+  public int getIdentifier() {
+    return dagId.getId();
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if(o == null) {
+      return false;
+    }
+    if (o.getClass() == this.getClass()) {
+      DagIdentifierImpl other = (DagIdentifierImpl) o;
+      return this.dagId.equals(other.dagId);
+    }
+    else {
+      return false;
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return "Dag: " + dagName + ":[" + getIdentifier() + "]";
+  }
+  
+  @Override
+  public int hashCode() {
+    return dagId.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
new file mode 100644
index 0000000..b834111
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
+
+public class TaskAttemptIdentifierImpl implements TaskAttemptIdentifier {
+
+  private final TaskIdentifier taskIdentifier;
+  private final TezTaskAttemptID attemptId;
+  
+  public TaskAttemptIdentifierImpl(String dagName, String vertexName, TezTaskAttemptID attemptId) {
+    this.attemptId = attemptId;
+    this.taskIdentifier = new TaskIdentifierImpl(dagName, vertexName, attemptId.getTaskID());
+  }
+
+  @Override
+  public int getIdentifier() {
+    return attemptId.getId();
+  }
+  
+  @Override
+  public TaskIdentifier getTaskIdentifier() {
+    return taskIdentifier;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if(o == null) {
+      return false;
+    }
+    if (o.getClass() == this.getClass()) {
+      TaskAttemptIdentifierImpl other = (TaskAttemptIdentifierImpl) o;
+      return this.attemptId.equals(other.attemptId);
+    }
+    else {
+      return false;
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return taskIdentifier.toString() + " Attempt: [" + getIdentifier() + "]";
+  }
+  
+  @Override
+  public int hashCode() {
+    return attemptId.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
new file mode 100644
index 0000000..fb0848a
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.TaskIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+
+public class TaskIdentifierImpl implements TaskIdentifier {
+
+  private final VertexIdentifier vertexIdentifier;
+  private final TezTaskID taskId;
+  
+  public TaskIdentifierImpl(String dagName, String vertexName, TezTaskID taskId) {
+    this.taskId = taskId;
+    this.vertexIdentifier = new VertexIdentifierImpl(dagName, vertexName, taskId.getVertexID());
+  }
+
+  @Override
+  public int getIdentifier() {
+    return taskId.getId();
+  }
+  
+  @Override
+  public VertexIdentifier getVertexIdentifier() {
+    return vertexIdentifier;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if(o == null) {
+      return false;
+    }
+    if (o.getClass() == this.getClass()) {
+      TaskIdentifierImpl other = (TaskIdentifierImpl) o;
+      return this.taskId.equals(other.taskId);
+    }
+    else {
+      return false;
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return vertexIdentifier.toString() + " Task [" + getIdentifier() + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    return taskId.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
new file mode 100644
index 0000000..4480f74
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.DagIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+
+public class VertexIdentifierImpl implements VertexIdentifier {
+
+  private final DagIdentifier dagIdentifier;
+  private final TezVertexID vertexId;
+  private final String vertexName;
+  
+  public VertexIdentifierImpl(String dagName, String vertexName, TezVertexID vertexId) {
+    this.vertexId = vertexId;
+    this.vertexName = vertexName;
+    this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGId());
+  }
+
+  @Override
+  public String getName() {
+    return vertexName;
+  }
+
+  @Override
+  public int getIdentifier() {
+    return vertexId.getId();
+  }
+  
+  @Override
+  public DagIdentifier getDagIdentifier() {
+    return dagIdentifier;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if(o == null) {
+      return false;
+    }
+    if (o.getClass() == this.getClass()) {
+      VertexIdentifierImpl other = (VertexIdentifierImpl) o;
+      return this.vertexId.equals(other.vertexId);
+    }
+    else {
+      return false;
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return dagIdentifier.toString() + " Vertex: " + vertexName + ":[" + getIdentifier() + "]";
+  }
+  
+  @Override
+  public int hashCode() {
+    return vertexId.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index bb3548d..ab7941e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -37,8 +37,8 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
@@ -142,7 +142,7 @@ public interface Vertex extends Comparable<Vertex> {
 
   int getInputVerticesCount();
   int getOutputVerticesCount();
-  void scheduleTasks(List<TaskWithLocationHint> tasks);
+  void scheduleTasks(List<ScheduleTaskRequest> tasks);
   void scheduleSpeculativeTask(TezTaskID taskId);
   Resource getTaskResource();
   

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 5e179bd..50624dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -28,10 +28,11 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 import java.util.EnumSet;
@@ -56,7 +57,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public void onVertexStarted(Map<String, List<Integer>> completions) {
+  public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
     managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
     Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
     for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
@@ -90,14 +91,14 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
     }
     
     tasksScheduled = true;
-    List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
+    List<ScheduleTaskRequest> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
     for (int i = 0; i < managedTasks; ++i) {
-      tasksToStart.add(new TaskWithLocationHint(i, null));
+      tasksToStart.add(ScheduleTaskRequest.create(i, null));
     }
 
     if (!tasksToStart.isEmpty()) {
       LOG.info("Starting " + tasksToStart.size() + " in " + getContext().getVertexName());
-      getContext().scheduleVertexTasks(tasksToStart);
+      getContext().scheduleTasks(tasksToStart);
     }
     // all tasks scheduled. Can call vertexManagerDone().
     // TODO TEZ-1714 for locking issues getContext().vertexManagerDone();
@@ -134,7 +135,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+  public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9519fa9..accfa62 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -77,7 +77,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -149,6 +149,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -160,6 +161,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.OutputStatistics;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
@@ -1489,16 +1491,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         getInputSpecList(taskIndex), getOutputSpecList(taskIndex), 
         getGroupInputSpecList(taskIndex));
   }
-  
+
   @Override
-  public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
+  public void scheduleTasks(List<ScheduleTaskRequest> tasksToSchedule) {
     try {
       unsetTasksNotYetScheduled();
       // update state under write lock
       writeLock.lock();
       try {
-        for (TaskWithLocationHint task : tasksToSchedule) {
-          if (numTasks <= task.getTaskIndex().intValue()) {
+        for (ScheduleTaskRequest task : tasksToSchedule) {
+          if (numTasks <= task.getTaskIndex()) {
             throw new TezUncheckedException(
                 "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier);
           }
@@ -1507,7 +1509,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             if (taskLocationHints == null) {
               taskLocationHints = new TaskLocationHint[numTasks];
             }
-            taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
+            taskLocationHints[task.getTaskIndex()] = locationHint;
           }
         }
       } finally {
@@ -1516,8 +1518,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       
       readLock.lock();
       try {
-        for (TaskWithLocationHint task : tasksToSchedule) {
-          TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue());
+        for (ScheduleTaskRequest task : tasksToSchedule) {
+          TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex());
           TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
           eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
               getTaskLocationHint(taskId)));
@@ -2755,8 +2757,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   }
   
+  private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG dag, 
+      List<TezTaskAttemptID> taIds) {
+    List<TaskAttemptIdentifier> attempts = new ArrayList<>(taIds.size());
+    String dagName = dag.getName();
+    for (TezTaskAttemptID taId : taIds) {
+      String vertexName = dag.getVertex(taId.getTaskID().getVertexID()).getName();
+      attempts.add(getTaskAttemptIdentifier(dagName, vertexName, taId));
+    }
+    return attempts;
+  }
+  
+  private static TaskAttemptIdentifier getTaskAttemptIdentifier(String dagName, String vertexName, 
+      TezTaskAttemptID taId) {
+    return new TaskAttemptIdentifierImpl(dagName, vertexName, taId);
+  }
+  
   private void recoveryCodeSimulatingStart() throws AMUserCodeException {
-    vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+    vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, pendingReportedSrcCompletions));
     // This code is duplicated from startVertex() because recovery does not follow normal
     // transitions. To be removed after recovery code is fixed.
     maybeSendConfiguredEvent();
@@ -3552,7 +3570,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     startedTime = clock.getTime();
     try {
-      vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+      vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag, pendingReportedSrcCompletions));
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier;
       LOG.error(msg, e);
@@ -3811,8 +3829,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         if (vertex.getState() == VertexState.RUNNING) {
           try {
             // Inform the vertex manager about the source task completing.
-            vertex.vertexManager.onSourceTaskCompleted(completionEvent
-                .getTaskAttemptId().getTaskID());
+            TezTaskAttemptID taId = completionEvent.getTaskAttemptId();
+            vertex.vertexManager.onSourceTaskCompleted(
+                getTaskAttemptIdentifier(vertex.dag.getName(), 
+                vertex.dag.getVertex(taId.getTaskID().getVertexID()).getName(), 
+                taId));
           } catch (AMUserCodeException e) {
             String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
             LOG.error(msg, e);
@@ -4317,6 +4338,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName());
         Preconditions.checkArgument(target != null,
             "Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
+        TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+        if (srcTaId.getTaskID().getVertexID().equals(vertexId)) {
+          // this is the producer tasks' vertex
+          vmEvent.setProducerAttemptIdentifier(
+              getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId));
+        }
         if (target == this) {
           vertexManager.onVertexManagerEventReceived(vmEvent);
         } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 64eb80f..9476860 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -36,7 +36,6 @@ import javax.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -54,8 +53,6 @@ import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.CallableEvent;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -64,10 +61,9 @@ import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -84,7 +80,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "deprecation"})
 public class VertexManager {
   final VertexManagerPluginDescriptor pluginDesc;
   final UserGroupInformation dagUgi;
@@ -201,10 +197,21 @@ public class VertexManager {
     }
 
     @Override
-    public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+    public synchronized void scheduleTasks(List<ScheduleTaskRequest> tasks) {
       checkAndThrowIfDone();
       managedVertex.scheduleTasks(tasks);
     }
+    
+    @Override
+    public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
+      checkAndThrowIfDone();
+      List<ScheduleTaskRequest> schedTasks = new ArrayList<ScheduleTaskRequest>(tasks.size());
+      for (TaskWithLocationHint task : tasks) {
+        schedTasks.add(ScheduleTaskRequest.create(
+            task.getTaskIndex(), task.getTaskLocationHint()));
+      }
+      scheduleTasks(schedTasks);
+    }
 
     @Nullable
     @Override
@@ -441,30 +448,12 @@ public class VertexManager {
     }
   }
 
-  public void onVertexStarted(List<TezTaskAttemptID> completions) throws AMUserCodeException {
-    Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
-    if (completions != null && !completions.isEmpty()) {
-      for (TezTaskAttemptID tezTaskAttemptID : completions) {
-        Integer taskId = Integer.valueOf(tezTaskAttemptID.getTaskID().getId());
-        String vertexName =
-            appContext.getCurrentDAG().getVertex(
-                tezTaskAttemptID.getTaskID().getVertexID()).getName();
-        List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
-        if (taskIdList == null) {
-          taskIdList = Lists.newArrayList();
-          pluginCompletionsMap.put(vertexName, taskIdList);
-        }
-        taskIdList.add(taskId);
-      }
-    }
-    enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(pluginCompletionsMap));
+  public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws AMUserCodeException {
+    enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(completions));
   }
 
-  public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException {
-    Integer taskId = Integer.valueOf(tezTaskId.getId());
-    String vertexName =
-        appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
-    enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(taskId, vertexName));
+  public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws AMUserCodeException {
+    enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(attempt));
   }
 
   public void onVertexManagerEventReceived(
@@ -562,31 +551,29 @@ public class VertexManager {
   }
   
   class VertexManagerEventOnVertexStarted extends VertexManagerEvent {
-    private final Map<String, List<Integer>> pluginCompletionsMap;
+    private final List<TaskAttemptIdentifier> pluginCompletions;
 
-    public VertexManagerEventOnVertexStarted(Map<String, List<Integer>> pluginCompletionsMap) {
-      this.pluginCompletionsMap = pluginCompletionsMap;
+    public VertexManagerEventOnVertexStarted(List<TaskAttemptIdentifier> pluginCompletions) {
+      this.pluginCompletions = pluginCompletions;
     }
     
     @Override
     public void invoke() throws Exception {
-      plugin.onVertexStarted(pluginCompletionsMap);
+      plugin.onVertexStarted(pluginCompletions);
     }
     
   }
   
   class VertexManagerEventSourceTaskCompleted extends VertexManagerEvent {
-    private final Integer taskId;
-    private final String vertexName;
+    private final TaskAttemptIdentifier attempt;
     
-    public VertexManagerEventSourceTaskCompleted(Integer taskId, String vertexName) {
-      this.taskId = taskId;
-      this.vertexName = vertexName;
+    public VertexManagerEventSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+      this.attempt = attempt;
     }
     
     @Override
     public void invoke() throws Exception {
-      plugin.onSourceTaskCompleted(vertexName, taskId);      
+      plugin.onSourceTaskCompleted(attempt);      
     }
     
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 6d23df3..83421a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -84,7 +84,6 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -101,6 +100,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.*;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
@@ -788,9 +788,10 @@ public class TestCommit {
     Assert.assertEquals(VertexState.COMMITTING, v1.getState());
     // reschedule task
     VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex1", ByteBuffer.wrap(new byte[0]));
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1.getVertexId(), 0), 0);
     TezEvent tezEvent = new TezEvent(vmEvent,
         new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", 
-            null, null));
+            null, taId));
     v1.handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
     waitUntil(dag, DAGState.FAILED);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
index 6d071a7..a17c7c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
@@ -28,9 +28,10 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -92,24 +93,25 @@ public class TestImmediateStartVertexManager {
       public Object answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
           scheduledTasks.clear();
-          List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-          for (TaskWithLocationHint task : tasks) {
+          List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+          for (ScheduleTaskRequest task : tasks) {
             scheduledTasks.add(task.getTaskIndex());
           }
           return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
     
+    List<TaskAttemptIdentifier> emptyCompletions = null;
     ImmediateStartVertexManager manager = new ImmediateStartVertexManager(mockContext);
     manager.initialize();
-    manager.onVertexStarted(null);
-    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+    manager.onVertexStarted(emptyCompletions);
+    verify(mockContext, times(0)).scheduleTasks(anyList());
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
         VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
         VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
         VertexState.CONFIGURED));
-    verify(mockContext, times(1)).scheduleVertexTasks(anyList());
+    verify(mockContext, times(1)).scheduleTasks(anyList());
     Assert.assertEquals(4, scheduledTasks.size());
 
     // simulate race between onVertexStarted and notifications
@@ -123,8 +125,8 @@ public class TestImmediateStartVertexManager {
         return null;
     }}).when(mockContext).registerForVertexStateUpdates(anyString(), anySet());
     raceManager.initialize();
-    raceManager.onVertexStarted(null);
-    verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+    raceManager.onVertexStarted(emptyCompletions);
+    verify(mockContext, times(2)).scheduleTasks(anyList());
     Assert.assertEquals(4, scheduledTasks.size());
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 3c0dd1e..cfc297e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -94,7 +94,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
@@ -161,6 +161,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -170,6 +171,7 @@ import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
@@ -2513,10 +2515,10 @@ public class TestVertexImpl {
     // verify all events have been put in pending.
     // this is not necessary after legacy routing has been removed
     Assert.assertEquals(5, v4.pendingTaskEvents.size());
-    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
     // scheduling start to trigger edge routing to begin
     for (int i=0; i<v4.getTotalTasks(); ++i) {
-      taskList.add(new TaskWithLocationHint(i, null));
+      taskList.add(ScheduleTaskRequest.create(i, null));
     }
     v4.scheduleTasks(taskList);
     dispatcher.await();
@@ -2604,10 +2606,10 @@ public class TestVertexImpl {
     VertexImpl v3 = vertices.get("vertex3");
     VertexImpl v4 = vertices.get("vertex4");
     
-    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
     // scheduling start to trigger edge routing to begin
     for (int i=0; i<v4.getTotalTasks(); ++i) {
-      taskList.add(new TaskWithLocationHint(i, null));
+      taskList.add(ScheduleTaskRequest.create(i, null));
     }
     v4.scheduleTasks(taskList);
     Assert.assertEquals(VertexState.RUNNING, v4.getState());
@@ -2662,10 +2664,10 @@ public class TestVertexImpl {
     VertexImpl v3 = vertices.get("vertex3");
     VertexImpl v4 = vertices.get("vertex4");
     
-    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
     // scheduling start to trigger edge routing to begin
     for (int i=0; i<v4.getTotalTasks(); ++i) {
-      taskList.add(new TaskWithLocationHint(i, null));
+      taskList.add(ScheduleTaskRequest.create(i, null));
     }
     v4.scheduleTasks(taskList);
     Assert.assertEquals(VertexState.RUNNING, v4.getState());
@@ -2856,7 +2858,7 @@ public class TestVertexImpl {
     startVertex(v1);
     v3.reconfigureVertex(10, null, null);
     checkTasks(v3, 10);
-    v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, null)));
     try {
       v3.reconfigureVertex(5, null, null);
       Assert.fail();
@@ -2881,7 +2883,7 @@ public class TestVertexImpl {
     checkTasks(v3, 10);
     taskEventDispatcher.events.clear();
     TaskLocationHint mockLocation = mock(TaskLocationHint.class);
-    v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), mockLocation)));
+    v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, mockLocation)));
     dispatcher.await();
     Assert.assertEquals(1, taskEventDispatcher.events.size());
     TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
@@ -2901,7 +2903,7 @@ public class TestVertexImpl {
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
-    v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, null)));
     try {
       v3.reconfigureVertex(5, null, null);
       Assert.fail();
@@ -2937,7 +2939,7 @@ public class TestVertexImpl {
         new VertexEventRouteEvent(v3.getVertexId(), taskEvents));
     dispatcher.await();
     Assert.assertEquals(2, v3.pendingTaskEvents.size());
-    v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0, null)));
     dispatcher.await();
     Assert.assertEquals(0, v3.pendingTaskEvents.size());
     // send events and test that they are not buffered anymore
@@ -4892,10 +4894,10 @@ public class TestVertexImpl {
       Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
     }
     
-    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    List<ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
     // scheduling start to trigger edge routing to begin
     for (int i=0; i<v1.getTotalTasks(); ++i) {
-      taskList.add(new TaskWithLocationHint(i, null));
+      taskList.add(ScheduleTaskRequest.create(i, null));
     }
     v1.scheduleTasks(taskList);
     dispatcher.await();
@@ -4940,10 +4942,10 @@ public class TestVertexImpl {
     Assert.assertEquals(true, initializerManager2.hasShutDown);
     
     // scheduling start to trigger edge routing to begin
-    taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
     // scheduling start to trigger edge routing to begin
     for (int i=0; i<v2.getTotalTasks(); ++i) {
-      taskList.add(new TaskWithLocationHint(i, null));
+      taskList.add(ScheduleTaskRequest.create(i, null));
     }
     v2.scheduleTasks(taskList);
     dispatcher.await();
@@ -5770,8 +5772,10 @@ public class TestVertexImpl {
     RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
     initializerManager1.completeInputInitialization();
 
-    Event vmEvent = VertexManagerEvent.create(v1.getName(), ByteBuffer.wrap(new byte[0]));
-    TezEvent tezEvent = new TezEvent(vmEvent, null);
+    VertexManagerEvent vmEvent = VertexManagerEvent.create(v1.getName(), ByteBuffer.wrap(new byte[0]));
+    TezTaskAttemptID taId1 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1.getVertexId(), 0), 0);
+    TezEvent tezEvent = new TezEvent(vmEvent, new EventMetaData(EventProducerConsumerType.OUTPUT,
+        v1.getName(), null, taId1));
     dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent)));
     dispatcher.await();
 
@@ -6043,7 +6047,7 @@ public class TestVertexImpl {
     verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
     verifyHistoryEvents(argCaptor.getAllValues(), HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
 
-    v3.scheduleTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+    v3.scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
     dispatcher.await();
     assertTrue(v3.pendingTaskEvents.size() == 0);
     // recovery events is not only handled one time
@@ -6078,11 +6082,11 @@ public class TestVertexImpl {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
     }
 
     @Override
@@ -6189,15 +6193,15 @@ public class TestVertexImpl {
     }
     
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
       if (this.exLocation == VMExceptionLocation.OnSourceTaskCompleted) {
         throw new RuntimeException(this.exLocation.name());
       }
-      super.onSourceTaskCompleted(srcVertexName, attemptId);
+      super.onSourceTaskCompleted(attempt);
     }
     
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
       if (this.exLocation == VMExceptionLocation.OnVertexStarted) {
         throw new RuntimeException(this.exLocation.name());
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index 81cb42a..9c16f5e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.CallableEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -183,11 +184,11 @@ public class TestVertexManager {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
index 84e060b..1cdaeca 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
@@ -25,7 +25,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -34,6 +33,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 public class VertexManagerPluginForTest extends VertexManagerPlugin {
@@ -99,14 +99,14 @@ public class VertexManagerPluginForTest extends VertexManagerPlugin {
   }
 
   @Override
-  public void onVertexStarted(Map<String, List<Integer>> completions) {
+  public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
     if (pluginConfig.getReconfigureOnStart()) {
       getContext().reconfigureVertex(pluginConfig.getNumTasks(), null, null);
     }
   }
 
   @Override
-  public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {}
+  public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {}
 
   @Override
   public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 30e3e81..f05cd95 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.dag.library.vertexmanager;
 
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -34,17 +33,16 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 
 @Private
 public class InputReadyVertexManager extends VertexManagerPlugin {
@@ -57,7 +55,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   TaskLocationHint oneToOneLocationHints[];
   int numOneToOneEdges;
   int numConfiguredSources;
-  Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
+  List<TaskAttemptIdentifier> pendingCompletions = Lists.newLinkedList();
   AtomicBoolean configured;
   AtomicBoolean started;
 
@@ -144,10 +142,8 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   
   private void trySchedulingPendingCompletions() {
     if (readyToSchedule() && !pendingCompletions.isEmpty()) {
-      for (Map.Entry<String, Collection<Integer>> entry : pendingCompletions.asMap().entrySet()) {
-        for (Integer i : entry.getValue()) {
-          onSourceTaskCompleted(entry.getKey(), i);
-        }
+      for (TaskAttemptIdentifier attempt : pendingCompletions) {
+        onSourceTaskCompleted(attempt);
       }
     }
   }
@@ -180,10 +176,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
-    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
-      pendingCompletions.putAll(entry.getKey(), entry.getValue());
-    }    
+  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
+    if (completions != null) {
+      pendingCompletions.addAll(completions);
+    }
 
     // allow scheduling
     started.set(true);
@@ -192,12 +188,14 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+    String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+    int taskId = attempt.getTaskIdentifier().getIdentifier();
     if (readyToSchedule()) {
       // configured and started. try to schedule
       handleSourceTaskFinished(srcVertexName, taskId);
     } else {
-      pendingCompletions.put(srcVertexName, taskId);
+      pendingCompletions.add(attempt);
     }
   }
 
@@ -245,7 +243,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
     
     // all source vertices will full dependencies are done
-    List<TaskWithLocationHint> tasksToStart = null;
+    List<ScheduleTaskRequest> tasksToStart = null;
     if (numOneToOneEdges == 0) {
       // no 1-1 dependency. Start all tasks
       int numTasks = taskIsStarted.length;
@@ -253,7 +251,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
       tasksToStart = Lists.newArrayListWithCapacity(numTasks);
       for (int i=0; i<numTasks; ++i) {
         taskIsStarted[i] = true;
-        tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), null));
+        tasksToStart.add(ScheduleTaskRequest.create(i, null));
       }
     } else {
       // start only the ready 1-1 tasks
@@ -268,13 +266,13 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
           LOG.info("Starting task " + i + " for vertex: "
               + getContext().getVertexName() + " with location: "
               + ((locationHint != null) ? locationHint.getAffinitizedTask() : "null"));
-          tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), locationHint));
+          tasksToStart.add(ScheduleTaskRequest.create(Integer.valueOf(i), locationHint));
         }
       }
     }
     
     if (tasksToStart != null && !tasksToStart.isEmpty()) {
-      getContext().scheduleVertexTasks(tasksToStart);
+      getContext().scheduleTasks(tasksToStart);
     }
     
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 6c3e3f8..d9c4941 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -46,11 +47,13 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -72,6 +75,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -143,6 +147,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   int totalTasksToSchedule = 0;
   private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
   
+  private Set<TaskIdentifier> taskWithVmEvents = Sets.newHashSet();
+  
   //Track source vertex and its finished tasks
   private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newConcurrentMap();
   boolean sourceVerticesScheduled = false;
@@ -469,7 +475,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   
   @Override
-  public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
+  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
     // examine edges after vertex started because until then these may not have been defined
     Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
     for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
@@ -498,10 +504,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
              totalTasksToSchedule + " pending tasks");
     
     if (completions != null) {
-      for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
-        for (Integer taskId : entry.getValue()) {
-          onSourceTaskCompleted(entry.getKey(), taskId);
-        }
+      for (TaskAttemptIdentifier attempt : completions) {
+        onSourceTaskCompleted(attempt);
       }
     }
     onVertexStartedDone.set(true);
@@ -511,7 +515,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
 
   @Override
-  public synchronized void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) {
+  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+    String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
+    int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
     updateSourceTaskCount();
     SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
 
@@ -550,7 +556,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   @Override
   public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
-    // TODO handle duplicates from retries
+    // currently events from multiple attempts of the same task can be ignored because
+    // their output will be the same. However, with pipelined events that may not hold.
+    TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
+    if (!taskWithVmEvents.add(producerTask)) {
+      LOG.info("Ignoring vertex manager event from: " + producerTask);
+      return;
+    }
+    
     numVertexManagerEventsReceived++;
 
     long sourceTaskOutputSize = 0;
@@ -758,16 +771,16 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     }
     //Sort in case partition stats are available
     sortPendingTasksBasedOnDataSize();
-    List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
+    List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule);
 
     while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
       numTasksToSchedule--;
       Integer taskIndex = pendingTasks.get(0).index;
-      scheduledTasks.add(new TaskWithLocationHint(taskIndex, null));
+      scheduledTasks.add(ScheduleTaskRequest.create(taskIndex, null));
       pendingTasks.remove(0);
     }
 
-    getContext().scheduleVertexTasks(scheduledTasks);
+    getContext().scheduleTasks(scheduledTasks);
     if (pendingTasks.size() == 0) {
       // done scheduling all tasks
       // TODO TEZ-1714 locking issues. getContext().vertexManagerDone();