You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/09/11 21:57:58 UTC

[2/2] git commit: TEZ-407. Support multiple inputs and connection patterns in Tez (bikas)

TEZ-407. Support multiple inputs and connection patterns in Tez (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d89485f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d89485f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d89485f

Branch: refs/heads/TEZ-398
Commit: 8d89485fd0c2d76894894447bb826b367ee2be7a
Parents: 0f5298a
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Sep 11 12:56:44 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Sep 11 12:56:44 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/DagTypeConverters.java   |  25 +--
 .../main/java/org/apache/tez/dag/api/Edge.java  |   1 +
 .../org/apache/tez/dag/api/TestDAGPlan.java     |   7 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  74 +++++--
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   4 +
 .../org/apache/tez/dag/app/dag/EdgeManager.java |  51 +++++
 .../java/org/apache/tez/dag/app/dag/Task.java   |   7 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  24 ++-
 .../apache/tez/dag/app/dag/VertexScheduler.java |   2 +-
 .../dag/app/dag/event/TaskEventAddTezEvent.java |  36 ++++
 .../tez/dag/app/dag/event/TaskEventType.java    |   3 +
 .../app/dag/event/VertexEventRouteEvent.java    |  39 ++++
 .../tez/dag/app/dag/event/VertexEventType.java  |   2 +
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |  69 ++++++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  45 ++--
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 214 +++++++++++++++++++
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  66 ++++++
 .../app/dag/impl/ScatterGatherEdgeManager.java  |  77 +++++++
 .../dag/app/dag/impl/ShuffleVertexManager.java  | 100 ++++++++-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   5 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  76 +++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 178 ++++++++++++---
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 112 +++++-----
 .../dag/app/dag/impl/TestVertexScheduler.java   |  54 +++--
 .../engine/newapi/events/DataMovementEvent.java |   2 +-
 .../engine/newapi/events/InputFailedEvent.java  |   2 +-
 .../tez/engine/newapi/impl/EventMetaData.java   |  14 +-
 .../newapi/impl/TezHeartbeatResponse.java       |  33 ++-
 28 files changed, 1122 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index fd07b5b..1fd78f1 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -209,23 +209,14 @@ public class DagTypeConverters {
     return edgePlanMap;
   }
   
-  public static Map<String, EdgeProperty> createEdgePropertyMapFromDAGPlan(
-      List<EdgePlan> edgeList) {  
-      
-    Map<String, EdgeProperty> map = new HashMap<String, EdgeProperty>();
-    for(EdgePlan edge: edgeList){
-       map.put(edge.getId(), 
-           new EdgeProperty(
-               convertFromDAGPlan(edge.getDataMovementType()),
-               convertFromDAGPlan(edge.getDataSourceType()),
-               convertFromDAGPlan(edge.getSchedulingType()),
-               convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
-               convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
-               )
-           );
-    }
-    
-    return map;
+  public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
+    return new EdgeProperty(
+        convertFromDAGPlan(edge.getDataMovementType()),
+        convertFromDAGPlan(edge.getDataSourceType()),
+        convertFromDAGPlan(edge.getSchedulingType()),
+        convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
+        convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
+    );
   }
 
   public static Resource createResourceRequestFromTaskConfig(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
index 71c90fa..a893bc3 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -31,6 +31,7 @@ public class Edge{
     this.edgeProperty = edgeProperty;
   }
   
+  // RENAME to source and destination
   public Vertex getInputVertex() {
     return inputVertex;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index c8d09d0..53ec357 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -25,7 +25,6 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -142,10 +141,8 @@ public class TestDAGPlan {
         .getUserPayload().toByteArray()));
     assertEquals("output", edgeProto.getEdgeSource().getClassName());
 
-    Map<String, EdgeProperty> edgePropertyMap = DagTypeConverters
-        .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList());
-    assertEquals(1, edgePropertyMap.size());
-    EdgeProperty edgeProperty = edgePropertyMap.values().iterator().next();
+    EdgeProperty edgeProperty = DagTypeConverters
+        .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList().get(0));
 
     byte[] ib = edgeProperty.getEdgeDestination().getUserPayload();
     assertEquals("inputBytes", new String(ib));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2c242de..cc99af2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -54,10 +55,12 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerImpl;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
 import org.apache.tez.engine.newapi.impl.TezEvent;
@@ -89,9 +92,18 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private Server server;
 
 
-  // TODO Use this to figure out whether an incoming ping is valid.
-  private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToContainerIdMap =
-      new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
+  class AttemptInfo {
+    AttemptInfo(ContainerId containerId) {
+      this.containerId = containerId;
+      this.lastReponse = null;
+      this.lastRequestId = -1;
+    }
+    ContainerId containerId;
+    long lastRequestId;
+    TezHeartbeatResponse lastReponse;
+  }
+  private ConcurrentMap<TezTaskAttemptID, AttemptInfo> attemptToInfoMap =
+      new ConcurrentHashMap<TezTaskAttemptID, AttemptInfo>();
 
   private Set<ContainerId> registeredContainers = Collections
       .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
@@ -479,9 +491,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // between polls (MRTask) implies tasks end up wasting upto 1 second doing
     // nothing. Similarly for CA_COMMIT.
 
+    /*
+    DAG job = context.getCurrentDAG();
+    Task task =
+        job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+            getTask(taskAttemptId.getTaskID());
 
     // TODO In-Memory Shuffle
-    /*
     if (task.needsWaitAfterOutputConsumable()) {
       TezTaskAttemptID outputReadyAttempt = task.getOutputConsumableAttempt();
       if (outputReadyAttempt != null) {
@@ -518,7 +534,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   @Override
   public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
-    attemptToContainerIdMap.remove(attemptId);
+    attemptToInfoMap.remove(attemptId);
   }
 
   public AMContainerTask pullTaskAttemptContext(ContainerId containerId) {
@@ -539,7 +555,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   @Override
   public void registerTaskAttempt(TezTaskAttemptID attemptId,
       ContainerId containerId) {
-    attemptToContainerIdMap.put(attemptId, containerId);
+    AttemptInfo attemptInfo = new AttemptInfo(containerId);
+    attemptToInfoMap.put(attemptId, attemptInfo);
   }
 
   @Override
@@ -556,7 +573,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
-    ContainerId containerId = attemptToContainerIdMap.get(taskAttemptId);
+    ContainerId containerId = attemptToInfoMap.get(taskAttemptId).containerId;
     if (containerId != null) {
       containerHeartbeatHandler.pinged(containerId);
     } else {
@@ -568,12 +585,45 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   @Override
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException {
-    // TODO TODONEWTEZ Auto-generated method stub
+    long requestId = request.getRequestId();
     TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
-    LOG.info("Ping from " + taskAttemptID.toString());
-    taskHeartbeatHandler.pinged(taskAttemptID);
-    pingContainerHeartbeatHandler(taskAttemptID);
-    return null;
+    AttemptInfo attemptInfo = attemptToInfoMap.get(taskAttemptID);
+    if(attemptInfo == null) {
+      throw new TezException("Attempt " + taskAttemptID
+          + " is not recognized for heartbeat");
+    }
+    synchronized (attemptInfo) {      
+      if(attemptInfo.lastRequestId == requestId) {
+        return attemptInfo.lastReponse;
+      }
+      if(attemptInfo.lastRequestId+1 < requestId) {
+        throw new TezException("Attempt " + taskAttemptID
+            + " has invalid request id. Expected: " + attemptInfo.lastRequestId+1 
+            + " and actual: " + requestId);
+      }
+      
+      // not safe to multiple call from same task
+      LOG.info("Ping from " + taskAttemptID.toString());
+      List<TezEvent> inEvents = request.getEvents();
+      if(inEvents!=null && inEvents.size()>0) {    
+        TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
+        context.getEventHandler().handle(new VertexEventRouteEvent(vertexId, inEvents));
+      }
+      taskHeartbeatHandler.pinged(taskAttemptID);
+      pingContainerHeartbeatHandler(taskAttemptID);
+      TezHeartbeatResponse response = new TezHeartbeatResponse();
+      response.setLastRequestId(requestId);
+      List<TezEvent> outEvents = context
+          .getCurrentDAG()
+          .getVertex(taskAttemptID.getTaskID().getVertexID())
+          .getTask(taskAttemptID.getTaskID())
+          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
+              request.getMaxEvents());
+      response.setEvents(outEvents);
+      attemptInfo.lastRequestId = requestId;
+      attemptInfo.lastReponse = response;
+      return response;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 455b583..ce1ee89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -49,6 +49,10 @@ public interface DAG {
    */
   TezCounters getAllCounters();
 
+  /**
+   * Get Vertex by vertex name
+   */
+  Vertex getVertex(String vertexName);
   Map<TezVertexID,Vertex> getVertices();
   Vertex getVertex(TezVertexID vertexId);
   List<String> getDiagnostics();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
new file mode 100644
index 0000000..c476966
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag;
+
+import java.util.List;
+
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public abstract class EdgeManager {
+  
+  public abstract int getNumDestinationTaskInputs(Vertex sourceVertex,
+      int destinationTaskIndex);
+
+  public abstract int getNumSourceTaskOutputs(Vertex destinationVertex,
+      int sourceTaskIndex);
+  
+  /**
+   * Return the destination task indeces to which to send the event
+   */
+  public abstract void routeEventToDestinationTasks(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+  
+  public abstract void routeEventToDestinationTasks(InputFailedEvent event,
+      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+
+  
+  /**
+   * Return the source task index to which to send the event
+   */
+  public abstract int routeEventToSourceTasks(int destinationTaskIndex,
+      InputReadErrorEvent event);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index f3ca831..0947a41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.dag;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.tez.common.counters.TezCounters;
@@ -25,6 +26,7 @@ import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
 
 /**
  * Read only view of Task.
@@ -71,4 +73,9 @@ public interface Task {
   TezTaskAttemptID getOutputConsumableAttempt();
   
   public Vertex getVertex();
+  
+  public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+      int fromEventId, int maxEvents);
+  
+  public List<TezEvent> getAndClearTaskTezEvents();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 48c9993..85240e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -24,11 +24,11 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
+import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -57,6 +57,7 @@ public interface Vertex extends Comparable<Vertex> {
 
   Map<TezTaskID, Task> getTasks();
   Task getTask(TezTaskID taskID);
+  Task getTask(int taskIndex);
   List<String> getDiagnostics();
   int getTotalTasks();
   int getCompletedTasks();
@@ -65,19 +66,20 @@ public interface Vertex extends Comparable<Vertex> {
   ProgressBuilder getVertexProgress();
   VertexStatusBuilder getVertexStatus();
 
-  void setParallelism(int parallelism, List<byte[]> taskUserPayloads);
+  void setParallelism(int parallelism,Map<Vertex, EdgeManager> sourceEdgeManagers);
 
   TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
       TezTaskAttemptID attemptId, int fromEventId, int maxEvents);
-
-  void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
-  void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
-
-  Map<Vertex, EdgeProperty> getInputVertices();
-  Map<Vertex, EdgeProperty> getOutputVertices();
-
-  List<InputSpec> getInputSpecList();
-  List<OutputSpec> getOutputSpecList();
+  
+  // CHANGE THESE TO LISTS AND MAINTAIN ORDER?
+  void setInputVertices(Map<Vertex, Edge> inVertices);
+  void setOutputVertices(Map<Vertex, Edge> outVertices);
+
+  Map<Vertex, Edge> getInputVertices();
+  Map<Vertex, Edge> getOutputVertices();
+  
+  List<InputSpec> getInputSpecList(int taskIndex);
+  List<OutputSpec> getOutputSpecList(int taskIndex);
 
   int getInputVerticesCount();
   int getOutputVerticesCount();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
index 7a85eb1..4c79712 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
-// Rename to VertexManager TEZ-364
+// Rename to VertexManager TEZ-364 and move to DAG API. Make abstract class.
 public interface VertexScheduler {
   void initialize(Configuration conf);
   void onVertexStarted();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
new file mode 100644
index 0000000..51f6d53
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+
+public class TaskEventAddTezEvent extends TaskEvent {
+
+  private TezEvent tezEvent;
+  
+  public TaskEventAddTezEvent(TezTaskID taskId, TezEvent tezEvent) {
+    super(taskId, TaskEventType.T_ADD_TEZ_EVENT);
+    this.tezEvent = tezEvent;
+  }
+
+  public TezEvent getTezEvent() {
+    return tezEvent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index bc7f3ff..d0ad8a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -31,6 +31,9 @@ public enum TaskEventType {
 
   //Producer:Speculator
   T_ADD_SPEC_ATTEMPT,
+  
+  //Producer:Edge
+  T_ADD_TEZ_EVENT,
 
   //Producer:TaskAttempt
   T_ATTEMPT_LAUNCHED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
new file mode 100644
index 0000000..c851ae0
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.List;
+
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+
+public class VertexEventRouteEvent extends VertexEvent {
+  
+  final List<TezEvent> events;
+
+  public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) {
+    super(vertexId, VertexEventType.V_ROUTE_EVENT);
+    this.events = events;
+  }
+  
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 43cffe6..dc7e2dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -57,4 +57,6 @@ public enum VertexEventType {
   INTERNAL_ERROR,
   V_COUNTER_UPDATE,
   
+  V_ROUTE_EVENT,
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
new file mode 100644
index 0000000..71f17ac
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -0,0 +1,69 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public class BroadcastEdgeManager extends EdgeManager {
+
+  @Override
+  public int getNumDestinationTaskInputs(Vertex sourceVertex,
+      int destinationTaskIndex) {
+    return sourceVertex.getTotalTasks();
+  }
+  
+  @Override
+  public int getNumSourceTaskOutputs(Vertex destinationVertex,
+      int sourceTaskIndex) {
+    return 1;
+  }
+  
+  @Override
+  public void routeEventToDestinationTasks(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+    event.setTargetIndex(sourceTaskIndex);
+    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+  }
+  
+  @Override
+  public void routeEventToDestinationTasks(InputFailedEvent event,
+      int sourceTaskIndex, int numDestinationTasks , List<Integer> taskIndices) {
+    event.setTargetIndex(sourceTaskIndex);
+    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);    
+  }
+  
+  @Override
+  public int routeEventToSourceTasks(int destinationTaskIndex,
+      InputReadErrorEvent event) {
+    return destinationTaskIndex;
+  }
+  
+  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndeces) {
+    for(int i=0; i<numDestinationTasks; ++i) {
+      taskIndeces.add(new Integer(i));
+    }    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 2ddcdd7..d30d178 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -131,7 +131,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final AppContext appContext;
 
   volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
-  private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>();
+  private Map<String, Edge> edges = new HashMap<String, Edge>();
   private TezCounters dagCounters = new TezCounters();
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
@@ -836,7 +836,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           dag.addVertex(v);
         }
 
-        dag.edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+        createDAGEdges(dag);
         Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
 
         // setup the dag
@@ -861,6 +861,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         return dag.finished(DAGState.FAILED);
       }
     }
+    
+    private void createDAGEdges(DAGImpl dag) {
+      for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
+        EdgeProperty edgeProperty = DagTypeConverters
+            .createEdgePropertyMapFromDAGPlan(edgePlan);
+        // edge manager may be also set via API when using custom edge type
+        dag.edges.put(edgePlan.getId(),
+            new Edge(edgeProperty, dag.getEventHandler()));
+      }
+    }
 
     private void assignDAGScheduler(DAGImpl dag) {
       if (dag.conf.getBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING,
@@ -870,17 +880,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       } else {
         boolean isMRR = true;
         for (Vertex vertex : dag.vertices.values()) {
-          Map<Vertex, EdgeProperty> outVertices = vertex.getOutputVertices();
-          Map<Vertex, EdgeProperty> inVertices = vertex.getInputVertices();
+          Map<Vertex, Edge> outVertices = vertex.getOutputVertices();
+          Map<Vertex, Edge> inVertices = vertex.getInputVertices();
           if (!(outVertices == null || outVertices.isEmpty() || (outVertices
-              .size() == 1 && outVertices.values().iterator().next()
+              .size() == 1 && outVertices.values().iterator().next().getEdgeProperty()
               .getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER))) {
             // more than 1 output OR single output is not bipartite
             isMRR = false;
             break;
           }
           if (!(inVertices == null || inVertices.isEmpty() || (inVertices
-              .size() == 1 && inVertices.values().iterator().next()
+              .size() == 1 && inVertices.values().iterator().next().getEdgeProperty()
               .getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER))) {
             // more than 1 output OR single output is not bipartite
             isMRR = false;
@@ -924,24 +934,28 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
       VertexPlan vertexPlan = vertex.getVertexPlan();
 
-      Map<Vertex, EdgeProperty> inVertices =
-          new HashMap<Vertex, EdgeProperty>();
+      Map<Vertex, Edge> inVertices =
+          new HashMap<Vertex, Edge>();
 
-      Map<Vertex, EdgeProperty> outVertices =
-          new HashMap<Vertex, EdgeProperty>();
+      Map<Vertex, Edge> outVertices =
+          new HashMap<Vertex, Edge>();
 
       for(String inEdgeId : vertexPlan.getInEdgeIdList()){
         EdgePlan edgePlan = edgePlans.get(inEdgeId);
         Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
-        EdgeProperty edgeProp = dag.edges.get(inEdgeId);
-        inVertices.put(inVertex, edgeProp);
+        Edge edge = dag.edges.get(inEdgeId);
+        edge.setSourceVertex(inVertex);
+        edge.setDestinationVertex(vertex);
+        inVertices.put(inVertex, edge);
       }
 
       for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
         EdgePlan edgePlan = edgePlans.get(outEdgeId);
         Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
-        EdgeProperty edgeProp = dag.edges.get(outEdgeId);
-        outVertices.put(outVertex, edgeProp);
+        Edge edge = dag.edges.get(outEdgeId);
+        edge.setSourceVertex(vertex);
+        edge.setDestinationVertex(outVertex);
+        outVertices.put(outVertex, edge);
       }
 
       vertex.setInputVertices(inVertices);
@@ -1008,7 +1022,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     vertexMap.put(v.getName(), v);
   }
 
-  Vertex getVertex(String vertexName) {
+  @Override
+  public Vertex getVertex(String vertexName) {
     return vertexMap.get(vertexName);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
new file mode 100644
index 0000000..5a8d4f6
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -0,0 +1,214 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+
+public class Edge {
+
+  private EdgeProperty edgeProperty;
+  private EdgeManager edgeManager;
+  @SuppressWarnings("rawtypes")
+  private EventHandler eventHandler;
+  private AtomicBoolean bufferEvents = new AtomicBoolean(false);
+  private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
+  private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
+  private Vertex sourceVertex;
+  private Vertex destinationVertex; // this may end up being a list for shared edge
+  
+  @SuppressWarnings("rawtypes")
+  public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
+    this.edgeProperty = edgeProperty;
+    this.eventHandler = eventHandler;
+    switch (edgeProperty.getDataMovementType()) {
+    case ONE_TO_ONE:
+      edgeManager = new OneToOneEdgeManager();
+      break;
+    case BROADCAST:
+      edgeManager = new BroadcastEdgeManager();
+      break;
+    case SCATTER_GATHER:
+      edgeManager = new ScatterGatherEdgeManager();
+      break;
+    default:
+      String message = "Unknown edge data movement type: "
+          + edgeProperty.getDataMovementType();
+      throw new TezUncheckedException(message);
+    }
+  }
+  
+  public EdgeProperty getEdgeProperty() {
+    return this.edgeProperty;
+  }
+  
+  public EdgeManager getEdgeManager() {
+    return this.edgeManager;
+  }
+  
+  public void setEdgeManager(EdgeManager edgeManager) {
+    if(edgeManager == null) {
+      throw new TezUncheckedException("Edge manager cannot be null");
+    }
+    this.edgeManager = edgeManager;
+  }
+  
+  public void setSourceVertex(Vertex sourceVertex) {
+    if (this.sourceVertex != null && this.sourceVertex != sourceVertex) {
+      throw new TezUncheckedException("Source vertex exists: "
+          + sourceVertex.getName());
+    }
+    this.sourceVertex = sourceVertex;
+  }
+
+  public void setDestinationVertex(Vertex destinationVertex) {
+    if (this.destinationVertex != null && this.destinationVertex != destinationVertex) {
+      throw new TezUncheckedException("Destination vertex exists: "
+          + destinationVertex.getName());
+    }
+    this.destinationVertex = destinationVertex;
+  }
+  
+  public InputSpec getDestinationSpec(int destinationTaskIndex) {
+    return new InputSpec(sourceVertex.getName(),
+        edgeProperty.getEdgeDestination(),
+        edgeManager.getNumDestinationTaskInputs(sourceVertex, destinationTaskIndex));
+ }
+  
+  public OutputSpec getSourceSpec(int sourceTaskIndex) {
+    return new OutputSpec(destinationVertex.getName(),
+        edgeProperty.getEdgeSource(), 
+        edgeManager.getNumSourceTaskOutputs(destinationVertex, sourceTaskIndex));
+  }
+  
+  public void startEventBuffering() {
+    bufferEvents.set(true);
+  }
+  
+  public void stopEventBuffering() {
+    // assume only 1 entity will start and stop event buffering
+    bufferEvents.set(false);
+    for(TezEvent event : destinationEventBuffer) {
+      sendTezEventToDestinationTasks(event);
+    }
+    destinationEventBuffer.clear();
+    for(TezEvent event : sourceEventBuffer) {
+      sendTezEventToSourceTasks(event);
+    }
+    sourceEventBuffer.clear();
+  }
+  
+  public void sendTezEventToSourceTasks(TezEvent tezEvent) {
+    if (bufferEvents.get()) {
+      switch (tezEvent.getEventType()) {
+      case INPUT_READ_ERROR_EVENT:
+        InputReadErrorEvent event = (InputReadErrorEvent) tezEvent.getEvent();
+        TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+        int destTaskIndex = destAttemptId.getTaskID().getId();
+        int srcTaskIndex = edgeManager.routeEventToSourceTasks(destTaskIndex, event);
+        // TODO this is BROKEN. TEZ-431
+//        TezTaskID srcTaskId = sourceVertex.getTask(srcTaskIndex).getTaskId();
+//        sendEventToTask(srcTaskId, tezEvent);
+        break;
+      default:
+        throw new TezUncheckedException("Unhandled tez event type: "
+            + tezEvent.getEventType());
+      }
+    } else {
+      sourceEventBuffer.add(tezEvent);
+    }
+  }
+  
+  public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
+    if (bufferEvents.get()) {
+      List<Integer> destTaskIndices = new ArrayList<Integer>();
+      switch (tezEvent.getEventType()) {
+      case DATA_MOVEMENT_EVENT:
+        DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
+        TezTaskAttemptID dmSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+        int dmSourceTaskIndex = dmSourceAttemptId.getTaskID().getId();
+        edgeManager.routeEventToDestinationTasks(dmEvent, dmSourceTaskIndex,
+            destinationVertex.getTotalTasks(), destTaskIndices);
+        for(Integer destTaskIndex : destTaskIndices) {
+          EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT, 
+              destinationVertex.getName(), 
+              sourceVertex.getName(), 
+              null); // will be filled by Task when sending the event. Is it needed?
+          destMeta.setIndex(dmEvent.getTargetIndex());
+          tezEvent.setDestinationInfo(destMeta);
+          TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
+          sendEventToTask(destTaskId, tezEvent);
+        }        
+        break;
+      case INPUT_FAILED_EVENT:
+        InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
+        TezTaskAttemptID ifSourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
+        int ifSourceTaskIndex = ifSourceAttemptId.getTaskID().getId();
+        edgeManager.routeEventToDestinationTasks(ifEvent, ifSourceTaskIndex,
+            destinationVertex.getTotalTasks(), destTaskIndices);
+        for(Integer destTaskIndex : destTaskIndices) {
+          EventMetaData destMeta = new EventMetaData(EventProducerConsumerType.INPUT, 
+              destinationVertex.getName(), 
+              sourceVertex.getName(), 
+              null); // will be filled by Task when sending the event. Is it needed?
+          destMeta.setIndex(ifEvent.getTargetIndex());
+          tezEvent.setDestinationInfo(destMeta);
+          TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
+          sendEventToTask(destTaskId, tezEvent);
+        }        
+      default:
+        throw new TezUncheckedException("Unhandled tez event type: "
+            + tezEvent.getEventType());
+      }
+    } else {
+      destinationEventBuffer.add(tezEvent);
+    }
+  }
+  
+  private void sendEventToDestination(List<Integer> destTaskIndeces, TezEvent tezEvent) {
+    for(Integer destTaskIndex : destTaskIndeces) {
+      TezTaskID destTaskId = destinationVertex.getTask(destTaskIndex).getTaskId();
+      sendEventToTask(destTaskId, tezEvent);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
+    eventHandler.handle(new TaskEventAddTezEvent(taskId, tezEvent));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
new file mode 100644
index 0000000..7c4743e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -0,0 +1,66 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public class OneToOneEdgeManager extends EdgeManager {
+
+  @Override
+  public int getNumDestinationTaskInputs(Vertex sourceVertex,
+      int destinationTaskIndex) {
+    return 1;
+  }
+  
+  @Override
+  public int getNumSourceTaskOutputs(Vertex destinationVertex,
+      int sourceTaskIndex) {
+    return 1;
+  }
+  
+  @Override
+  public void routeEventToDestinationTasks(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+    event.setTargetIndex(0);
+    addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+  }
+  
+  @Override
+  public void routeEventToDestinationTasks(InputFailedEvent event,
+      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+    event.setTargetIndex(0);
+    addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+  }
+  
+  @Override
+  public int routeEventToSourceTasks(int destinationTaskIndex,
+      InputReadErrorEvent event) {
+    return destinationTaskIndex;
+  }
+  
+  void addDestinationTaskIndex(int sourceTaskIndex, List<Integer> taskIndeces) {
+    taskIndeces.add(new Integer(sourceTaskIndex));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
new file mode 100644
index 0000000..380b6b6
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -0,0 +1,77 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+
+import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+public class ScatterGatherEdgeManager extends EdgeManager {
+
+  private int initialDestinationTaskNumber = -1;
+
+  @Override
+  public int getNumDestinationTaskInputs(Vertex sourceVertex,
+      int destinationTaskIndex) {
+    return sourceVertex.getTotalTasks();
+  }
+  
+  @Override
+  public int getNumSourceTaskOutputs(Vertex destinationVertex,
+      int sourceTaskIndex) {
+    if(initialDestinationTaskNumber == -1) {
+      // the downstream vertex may not have started and so its number of tasks
+      // may change. So save this initial count and provide a consistent view 
+      // to all source tasks, including late starters and retries.
+      // When the number of destination tasks change then the routing will have 
+      // to be updated too.
+      // This value may be obtained from config too if destination task initial 
+      // parallelism is not specified.
+      initialDestinationTaskNumber = destinationVertex.getTotalTasks();
+    }
+    return initialDestinationTaskNumber;
+  }
+
+  @Override
+  public void routeEventToDestinationTasks(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+    int destinationTaskIndex = event.getSourceIndex();
+    event.setTargetIndex(sourceTaskIndex);
+    taskIndices.add(new Integer(destinationTaskIndex));
+  }
+
+  @Override
+  public void routeEventToDestinationTasks(InputFailedEvent event,
+      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+    int destinationTaskIndex = event.getSourceIndex();
+    event.setTargetIndex(sourceTaskIndex);
+    taskIndices.add(new Integer(destinationTaskIndex));
+  }
+
+  @Override
+  public int routeEventToSourceTasks(int destinationTaskIndex,
+      InputReadErrorEvent event) {
+    return event.getIndex();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index bb6e2ee..b854a43 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -30,15 +30,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 
@@ -72,9 +75,10 @@ public class ShuffleVertexManager implements VertexScheduler {
   
   public ShuffleVertexManager(Vertex managedVertex) {
     this.managedVertex = managedVertex;
-    Map<Vertex, EdgeProperty> inputs = managedVertex.getInputVertices();
-    for(Map.Entry<Vertex, EdgeProperty> entry : inputs.entrySet()) {
-      if(entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+    Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
+    for(Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
+      if (entry.getValue().getEdgeProperty().getDataMovementType() == 
+          DataMovementType.SCATTER_GATHER) {
         Vertex vertex = entry.getKey();
         bipartiteSources.put(vertex.getVertexId(), vertex);
       }
@@ -86,6 +90,82 @@ public class ShuffleVertexManager implements VertexScheduler {
     // dynamically changed as the DAG progresses.
   }
   
+  
+  public class CustomEdgeManager extends EdgeManager {
+    int numSourceTaskOutputs;
+    int numDestinationTasks;
+    int basePartitionRange;
+    int remainderRangeForLastShuffler;
+    
+    CustomEdgeManager(int numSourceTaskOutputs, int numDestinationTasks,
+        int basePartitionRange, int remainderPartitionForLastShuffler) {
+      this.numSourceTaskOutputs = numSourceTaskOutputs;
+      this.numDestinationTasks = numDestinationTasks;
+      this.basePartitionRange = basePartitionRange;
+      this.remainderRangeForLastShuffler = remainderPartitionForLastShuffler;
+    }
+
+    @Override
+    public int getNumDestinationTaskInputs(Vertex sourceVertex,
+        int destinationTaskIndex) {
+      int partitionRange = 1;
+      if(destinationTaskIndex < numDestinationTasks-1) {
+        partitionRange = basePartitionRange;
+      } else {
+        partitionRange = remainderRangeForLastShuffler;
+      }
+      return sourceVertex.getTotalTasks() * partitionRange;
+    }
+
+    @Override
+    public int getNumSourceTaskOutputs(Vertex destinationVertex,
+        int sourceTaskIndex) {
+      return numSourceTaskOutputs;
+    }
+    
+    @Override
+    public void routeEventToDestinationTasks(DataMovementEvent event,
+        int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+      int sourceIndex = event.getSourceIndex();
+      int destinationTaskIndex = sourceIndex/basePartitionRange;
+      
+      // all inputs from a source task are next to each other in original order
+      int targetIndex = 
+          sourceTaskIndex * basePartitionRange 
+          + sourceIndex % basePartitionRange;
+      
+      event.setTargetIndex(targetIndex);
+      taskIndices.add(new Integer(destinationTaskIndex));
+    }
+
+    @Override
+    public void routeEventToDestinationTasks(InputFailedEvent event,
+        int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+      int sourceIndex = event.getSourceIndex();
+      int destinationTaskIndex = sourceIndex/basePartitionRange;
+      
+      int targetIndex = 
+          sourceTaskIndex * basePartitionRange 
+          + sourceIndex % basePartitionRange;
+      
+      event.setTargetIndex(targetIndex);
+      taskIndices.add(new Integer(destinationTaskIndex));
+    }
+
+    @Override
+    public int routeEventToSourceTasks(int destinationTaskIndex,
+        InputReadErrorEvent event) {
+      int partitionRange = 1;
+      if(destinationTaskIndex < numDestinationTasks-1) {
+        partitionRange = basePartitionRange;
+      } else {
+        partitionRange = remainderRangeForLastShuffler;
+      }
+      return event.getIndex()/partitionRange;
+    }
+  }
+
+  
   @Override
   public void onVertexStarted() {
     pendingTasks = new ArrayList<TezTaskID>(managedVertex.getTotalTasks());
@@ -140,7 +220,7 @@ public class ShuffleVertexManager implements VertexScheduler {
     }
     numSourceTasks = numSrcTasks;
   }
-  
+
   void determineParallelismAndApply() {
     if(numSourceTasksCompleted == 0) {
       return;
@@ -199,7 +279,15 @@ public class ShuffleVertexManager implements VertexScheduler {
         throw new TezUncheckedException(e);
       }
       
-      managedVertex.setParallelism(finalTaskParallelism, taskConfs);
+      Map<Vertex, EdgeManager> edgeManagers = new HashMap<Vertex, EdgeManager>(
+          bipartiteSources.size());
+      for(Vertex vertex : bipartiteSources.values()) {
+        edgeManagers.put(vertex, new CustomEdgeManager(currentParallelism,
+            finalTaskParallelism, basePartitionRange,
+            remainderRangeForLastShuffler));
+      }
+      
+      managedVertex.setParallelism(finalTaskParallelism, edgeManagers);
       updatePendingTasks();      
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 00ef9e5..30bb1eb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -297,9 +297,10 @@ public class TaskAttemptImpl implements TaskAttempt,
     Vertex vertex = getVertex();
     ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
     DAG dag = vertex.getDAG();
+    int taskId = getTaskID().getId();
     return new TaskSpec(getID(), dag.getUserName(),
-        vertex.getName(), procDesc, vertex.getInputSpecList(),
-        vertex.getOutputSpecList());
+        vertex.getName(), procDesc, vertex.getInputSpecList(taskId),
+        vertex.getOutputSpecList(taskId));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index a43453c..b66760c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -18,9 +18,11 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -60,6 +62,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
@@ -71,6 +74,7 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -102,6 +106,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   protected boolean encryptedShuffle;
   protected TaskLocationHint locationHint;
+  
+  private List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
+  private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS = 
+      new ArrayList(0);
+  
 
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
@@ -113,6 +122,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      KILL_TRANSITION = new KillTransition();
+  private static final SingleArcTransition<TaskImpl, TaskEvent>
+     ADD_TEZ_EVENT_TRANSITION = new AddTezEventTransition();
 
   private static final StateMachineFactory
                <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
@@ -128,6 +139,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
             TaskEventType.T_TERMINATE,
             new KillNewTransition())
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.NEW,
+        TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
@@ -142,6 +156,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
+    .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
+        TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
 
     // Transitions from RUNNING state
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
@@ -168,12 +184,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
         TaskEventType.T_TERMINATE,
         KILL_TRANSITION)
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+        TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+
 
     // Transitions from KILL_WAIT state
     .addTransition(TaskStateInternal.KILL_WAIT,
         EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
         TaskEventType.T_ATTEMPT_KILLED,
         new KillWaitAttemptKilledTransition())
+    .addTransition(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILL_WAIT,
+        TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
     // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.KILL_WAIT,
@@ -196,6 +217,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
         TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
+    .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+        TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
     // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
@@ -204,12 +227,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
     // Transitions from FAILED state
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+        TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from KILLED state
     .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
+        TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+    .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
@@ -415,6 +442,46 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       readLock.unlock();
     }
   }
+  
+  @Override
+  public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+      int fromEventId, int maxEvents) {
+    List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
+    readLock.lock();
+    try {
+      if (tezEventsForTaskAttempts.size() > fromEventId) {
+        int actualMax = Math.min(maxEvents,
+            (tezEventsForTaskAttempts.size() - fromEventId));
+        events = Collections.unmodifiableList(tezEventsForTaskAttempts.subList(
+            fromEventId, actualMax + fromEventId));
+        // currently not modifying the events so that we dont have to create 
+        // copies of events. e.g. if we have to set taskAttemptId into the TezEvent
+        // destination metadata then we will need to create a copy of the TezEvent
+        // and then modify the metadata and then send the copy on the RPC. This 
+        // is important because TezEvents are only routed in the AM and not copied 
+        // during routing. So e.g. a broadcast edge will send the same event to 
+        // all consumers (like it should). If copies were created then re-routing 
+        // the events on parallelism changes would be difficult. We would have to 
+        // buffer the events in the Vertex until the parallelism was set and then 
+        // route the events.
+      }
+      return events;
+    } finally {
+      readLock.unlock();
+    }    
+  }
+  
+  @Override 
+  public List<TezEvent> getAndClearTaskTezEvents() {
+    readLock.lock();
+    try {
+      List<TezEvent> events = tezEventsForTaskAttempts;
+      tezEventsForTaskAttempts = new ArrayList<TezEvent>(); 
+      return events;
+    } finally {
+      readLock.unlock();
+    }        
+  }
 
   @VisibleForTesting
   public TaskStateInternal getInternalState() {
@@ -1078,6 +1145,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           logMsg));
     }
   }
+  
+  private static class AddTezEventTransition 
+      implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TaskEventAddTezEvent addEvent = (TaskEventAddTezEvent) event;
+      task.tezEventsForTaskAttempts.add(addEvent.getTezEvent());
+    }
+  }
 
   private static class KillTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a040ff2..1186caa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -23,11 +23,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -52,7 +54,6 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -71,6 +72,7 @@ import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -87,6 +89,7 @@ import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
@@ -101,12 +104,15 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 
@@ -177,6 +183,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private static final InternalErrorTransition
       INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final RouteEventTransition
+      ROUTE_EVENT_TRANSITION = new RouteEventTransition();
   private static final TaskAttemptCompletedEventTransition
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new TaskAttemptCompletedEventTransition();
@@ -244,6 +252,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               VertexState.RUNNING,
               VertexState.ERROR, VertexEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(
+              VertexState.RUNNING,
+              VertexState.RUNNING, VertexEventType.V_ROUTE_EVENT,
+              ROUTE_EVENT_TRANSITION)
 
           // Transitions from TERMINATING state.
           .addTransition
@@ -345,8 +357,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   // For committer
   private final VertexContext vertexContext;
 
-  private Map<Vertex, EdgeProperty> sourceVertices;
-  private Map<Vertex, EdgeProperty> targetVertices;
+  @VisibleForTesting
+  Map<Vertex, Edge> sourceVertices;
+  private Map<Vertex, Edge> targetVertices;
 
   private VertexScheduler vertexScheduler;
 
@@ -457,6 +470,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
+  public Task getTask(int taskIndex) {
+    readLock.lock();
+    try {
+      // does it matter to create a duplicate list for efficiency
+      // instead of traversing the map
+      // local assign to LinkedHashMap to ensure that sequential traversal 
+      // assumption is satisfied
+      LinkedHashMap<TezTaskID, Task> taskList = tasks;
+      int i=0; 
+      for(Map.Entry<TezTaskID, Task> entry : taskList.entrySet()) {
+        if(taskIndex == i) {
+          return entry.getValue();
+        }
+      }
+      return null;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public int getTotalTasks() {
     return numTasks;
   }
@@ -658,12 +692,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public void setParallelism(int parallelism, List<byte[]> taskUserPayloads) {
+  public void setParallelism(int parallelism,
+      Map<Vertex, EdgeManager> sourceEdgeManagers) {
     writeLock.lock();
     try {
-      Preconditions.checkArgument(
-          taskUserPayloads == null || taskUserPayloads.size() == parallelism,
-          "Userpayload must be set for all tasks or set to null");
       if (parallelism >= numTasks) {
         // not that hard to support perhaps. but checking right now since there
         // is no use case for it and checking may catch other bugs.
@@ -674,7 +706,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Ingoring setParallelism to current value: " + parallelism);
         return;
       }
-
+      
+      // start buffering incoming events so that we can re-route existing events
+      for (Edge edge : sourceVertices.values()) {
+        edge.startEventBuffering();
+      }
+      
+      // Use a set since the same event may have been sent to multiple tasks
+      // and we want to avoid duplicates
+      Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
+      
       LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism);
       // assign to local variable of LinkedHashMap to make sure that changing
       // type of task causes compile error. We depend on LinkedHashMap for order
@@ -691,6 +732,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               "All tasks must be in initial state when changing parallelism"
                   + " for vertex: " + getVertexId() + " name: " + getName());
         }
+        pendingEvents.addAll(task.getAndClearTaskTezEvents());
         if (i <= parallelism) {
           continue;
         }
@@ -698,13 +740,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         iter.remove();
       }
       this.numTasks = parallelism;
-      if (taskUserPayloads != null) {
-        this.taskUserPayloads = new ArrayList<byte[]>(taskUserPayloads);
-      }
       assert tasks.size() == numTasks;
+
+      // set new edge managers
+      if(sourceEdgeManagers != null) {
+        for(Map.Entry<Vertex, EdgeManager> entry : sourceEdgeManagers.entrySet()) {
+          Vertex sourceVertex = entry.getKey();
+          EdgeManager edgeManager = entry.getValue();
+          Edge edge = sourceVertices.get(sourceVertex);
+          LOG.info("Replacing edge manager for source:" 
+              + sourceVertex.getVertexId() + " destination: " + getVertexId());
+          edge.setEdgeManager(edgeManager);
+        }
+      }
+      
+      // Re-route all existing TezEvents according to new routing table
+      // At this point only events attributed to source task attempts can be 
+      // re-routed. e.g. DataMovement or InputFailed events.  
+      // This assumption is fine for now since these tasks haven't been started.
+      // So they can only get events generated from source task attempts that 
+      // have already been started.
+      DAG dag = getDAG();
+      for(TezEvent event : pendingEvents) {
+        TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
+            .getTaskID().getVertexID(); 
+        Vertex sourceVertex = dag.getVertex(sourceVertexId);
+        Edge sourceEdge = sourceVertices.get(sourceVertex);
+        sourceEdge.sendTezEventToDestinationTasks(event);
+      }
+      
+      // stop buffering events
+      for (Edge edge : sourceVertices.values()) {
+        edge.stopEventBuffering();
+      }
+
     } finally {
       writeLock.unlock();
     }
+    
   }
 
   @Override
@@ -957,10 +1030,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // create the Tasks but don't start them yet
         createTasks(vertex);
 
+        // TODO get this from API
         boolean hasBipartite = false;
         if (vertex.sourceVertices != null) {
-          for (EdgeProperty edgeProperty : vertex.sourceVertices.values()) {
-            if (edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+          for (Edge edge : vertex.sourceVertices.values()) {
+            if (edge.getEdgeProperty().getDataMovementType() == 
+                      DataMovementType.SCATTER_GATHER) {
               hasBipartite = true;
               break;
             }
@@ -1344,6 +1419,55 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     diagnostics.add(diag);
   }
 
+  private static class RouteEventTransition  implements
+  SingleArcTransition<VertexImpl, VertexEvent> {
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
+      List<TezEvent> tezEvents = rEvent.getEvents();
+      for(TezEvent tezEvent : tezEvents) {
+        switch(tezEvent.getEventType()) {
+        case DATA_MOVEMENT_EVENT:
+          {
+            EventMetaData sourceMeta = tezEvent.getSourceInfo();
+            TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+            DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
+            dmEvent.setVersion(srcTaId.getId());
+            assert sourceMeta.getTaskVertexName().equals(vertex.getName());
+            Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
+                sourceMeta.getEdgeVertexName()));
+            destEdge.sendTezEventToDestinationTasks(tezEvent);
+          }
+          break;
+        case INPUT_FAILED_EVENT:
+        {
+          EventMetaData sourceMeta = tezEvent.getSourceInfo();
+          TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+          InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
+          ifEvent.setVersion(srcTaId.getId());
+          assert sourceMeta.getTaskVertexName().equals(vertex.getName());
+          Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
+              sourceMeta.getEdgeVertexName()));
+          destEdge.sendTezEventToDestinationTasks(tezEvent);
+        }
+        break;
+        case INPUT_READ_ERROR_EVENT:
+          {
+            EventMetaData sourceMeta = tezEvent.getSourceInfo();
+            assert sourceMeta.getTaskVertexName().equals(vertex.getName());
+            Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+                sourceMeta.getEdgeVertexName()));
+            srcEdge.sendTezEventToSourceTasks(tezEvent);
+          }
+          break;
+        default:
+          throw new TezUncheckedException("Unhandled tez event type: "
+              + tezEvent.getEventType());
+        }
+      }
+    }
+  }
+  
   private static class InternalErrorTransition implements
       SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
@@ -1359,12 +1483,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public void setInputVertices(Map<Vertex, EdgeProperty> inVertices) {
+  public void setInputVertices(Map<Vertex, Edge> inVertices) {
     this.sourceVertices = inVertices;
   }
 
   @Override
-  public void setOutputVertices(Map<Vertex, EdgeProperty> outVertices) {
+  public void setOutputVertices(Map<Vertex, Edge> outVertices) {
     this.targetVertices = outVertices;
   }
 
@@ -1395,12 +1519,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public Map<Vertex, EdgeProperty> getInputVertices() {
+  public Map<Vertex, Edge> getInputVertices() {
     return Collections.unmodifiableMap(this.sourceVertices);
   }
 
   @Override
-  public Map<Vertex, EdgeProperty> getOutputVertices() {
+  public Map<Vertex, Edge> getOutputVertices() {
     return Collections.unmodifiableMap(this.targetVertices);
   }
 
@@ -1448,12 +1572,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   // TODO Eventually remove synchronization.
   @Override
-  public synchronized List<InputSpec> getInputSpecList() {
+  public synchronized List<InputSpec> getInputSpecList(int taskIndex) {
     inputSpecList = new ArrayList<InputSpec>(
         this.getInputVerticesCount());
-    for (Entry<Vertex, EdgeProperty> entry : this.getInputVertices().entrySet()) {
-      InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
-          entry.getValue().getEdgeDestination(), entry.getKey().getTotalTasks());
+    for (Entry<Vertex, Edge> entry : this.getInputVertices().entrySet()) {
+      InputSpec inputSpec = entry.getValue().getDestinationSpec(taskIndex);
       if (LOG.isDebugEnabled()) {
         LOG.debug("For vertex : " + this.getName()
             + ", Using InputSpec : " + inputSpec);
@@ -1466,16 +1589,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   // TODO Eventually remove synchronization.
   @Override
-  public synchronized List<OutputSpec> getOutputSpecList() {
+  public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) {
     if (this.outputSpecList == null) {
       outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount());
-      for (Entry<Vertex, EdgeProperty> entry : this.getOutputVertices().entrySet()) {
-        OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
-            entry.getValue().getEdgeSource(), entry.getKey().getTotalTasks());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("For vertex : " + this.getName()
-              + ", Using OutputSpec : " + outputSpec);
-        }
+      for (Entry<Vertex, Edge> entry : this.getOutputVertices().entrySet()) {
+        OutputSpec outputSpec = entry.getValue().getSourceSpec(taskIndex);
         // TODO DAGAM This should be based on the edge type.
         outputSpecList.add(outputSpec);
       }