You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/22 08:26:38 UTC

tez git commit: TEZ-2409. Allow different edges to have different routing plugins (bikas)

Repository: tez
Updated Branches:
  refs/heads/master aa6a84c2c -> 41173aa0a


TEZ-2409. Allow different edges to have different routing plugins (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/41173aa0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/41173aa0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/41173aa0

Branch: refs/heads/master
Commit: 41173aa0a0cc60924902ce9e78a452e2069a2a31
Parents: aa6a84c
Author: Bikas Saha <bi...@apache.org>
Authored: Thu May 21 23:26:25 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu May 21 23:26:25 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/TaskAttemptEventInfo.java       |   8 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   3 +-
 .../java/org/apache/tez/dag/app/dag/Task.java   |   3 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   8 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   9 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  93 +++++++----------
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   9 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |  23 ++---
 .../app/TestTaskAttemptListenerImplTezDag.java  |   4 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 100 ++++++++++++++-----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  52 ++++++----
 .../org/apache/tez/runtime/RuntimeTask.java     |  10 ++
 .../runtime/api/impl/TezHeartbeatRequest.java   |  13 ++-
 .../runtime/api/impl/TezHeartbeatResponse.java  |  12 +++
 .../apache/tez/runtime/task/TaskReporter.java   |   6 +-
 17 files changed, 221 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f248b9..995bb6c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,7 @@ ALL CHANGES:
   TEZ-2455. Tez UI: Dag view caching, error handling and minor layout changes
   TEZ-2453. Tez UI: show the dagInfo is the application has set the same.
   TEZ-2447. Tez UI: Generic changes based on feedbacks.
+  TEZ-2409. Allow different edges to have different routing plugins
 
 Release 0.7.0: 2015-05-18
 

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
index 49ff044..d6b03e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
@@ -23,10 +23,12 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 
 public class TaskAttemptEventInfo {
   private final int nextFromEventId;
+  private final int nextPreRoutedFromEventId;
   private final List<TezEvent> events;
   
-  public TaskAttemptEventInfo(int nextFromEventId, List<TezEvent> events) {
+  public TaskAttemptEventInfo(int nextFromEventId, List<TezEvent> events, int nextPreRoutedEventId) {
     this.nextFromEventId = nextFromEventId;
+    this.nextPreRoutedFromEventId = nextPreRoutedEventId;
     this.events = events;
   }
   
@@ -34,6 +36,10 @@ public class TaskAttemptEventInfo {
     return nextFromEventId;
   }
   
+  public int getNextPreRoutedFromEventId() {
+    return nextPreRoutedFromEventId;
+  }
+
   public List<TezEvent> getEvents() {
     return events;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 970489d..2bf7de3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -448,9 +448,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             .getCurrentDAG()
             .getVertex(taskAttemptID.getTaskID().getVertexID())
             .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
-                request.getMaxEvents());
+                request.getPreRoutedStartIndex(), request.getMaxEvents());
         response.setEvents(eventInfo.getEvents());
         response.setNextFromEventId(eventInfo.getNextFromEventId());
+        response.setNextPreRoutedEventId(eventInfo.getNextPreRoutedFromEventId());
       }
       containerInfo.lastRequestId = requestId;
       containerInfo.lastReponse = response;

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 177ee8a..47b56f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.dag;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -59,7 +60,7 @@ public interface Task {
   
   public Vertex getVertex();
   
-  public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+  public ArrayList<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
       int fromEventId, int maxEvents);
   
   public List<String> getDiagnostics();

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index bb42392..bb3548d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -147,7 +147,7 @@ public interface Vertex extends Comparable<Vertex> {
   Resource getTaskResource();
   
   public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
-      int fromEventId, int maxEvents);
+      int fromEventId, int nextPreRoutedFromEventId, int maxEvents);
   
   void handleSpeculatorEvent(SpeculatorEvent event);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 78bab05..f9cbede 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -197,7 +197,7 @@ public class Edge {
     setEdgeProperty(modifiedEdgeProperty);
   }
   
-  public synchronized boolean routingToBegin() throws AMUserCodeException {
+  public synchronized void routingToBegin() throws AMUserCodeException {
     if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
       routingNeeded = false;
     } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
@@ -216,7 +216,11 @@ public class Edge {
       }
     }
     
-    LOG.info("Routing to begin for edge: " + getEdgeInfo() + ". EdgeProperty: " + edgeProperty);
+    LOG.info("Routing to begin for edge: " + getEdgeInfo() + ". EdgeProperty: " + edgeProperty + 
+        " onDemandRouting: " + hasOnDemandRouting());
+  }
+  
+  public synchronized boolean hasOnDemandRouting() {
     return onDemandRouting;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index de5ab2a..b2eb81e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -138,7 +138,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   final StateChangeNotifier stateChangeNotifier;
 
   private final List<TezEvent> tezEventsForTaskAttempts = new ArrayList<TezEvent>();
-  private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
+  static final ArrayList<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
       new ArrayList(0);
 
   // track the status of TaskAttempt (true mean completed, false mean uncompleted)
@@ -485,9 +485,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   @Override
-  public List<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+  public ArrayList<TezEvent> getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
       int fromEventId, int maxEvents) {
-    List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
+    ArrayList<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
     readLock.lock();
 
     try {
@@ -500,8 +500,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         int actualMax = Math.min(maxEvents,
             (tezEventsForTaskAttempts.size() - fromEventId));
         int toEventId = actualMax + fromEventId;
-        events = Collections.unmodifiableList(new ArrayList<TezEvent>(
-            tezEventsForTaskAttempts.subList(fromEventId, toEventId)));
+        events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId));
         LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId
             + "-" + toEventId + ")");
         // currently not modifying the events so that we dont have to create

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e20ee6e..e909c9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -228,9 +228,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private Configuration vertexConf;
   
   private final boolean isSpeculationEnabled;
-  
-  @VisibleForTesting
-  public boolean useOnDemandRouting = true;
 
   //fields initialized in init
 
@@ -742,8 +739,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock();
   private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock();
   
-  private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
-      new ArrayList(0);
   List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
   List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
 
@@ -1419,26 +1414,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
   
-  boolean setupEdgeRouting() throws AMUserCodeException {
-    boolean doOnDemand = useOnDemandRouting;
+  void setupEdgeRouting() throws AMUserCodeException {
     for (Edge e : sourceVertices.values()) {
-      boolean edgeDoingOnDemand = e.routingToBegin();
-      if (doOnDemand && !edgeDoingOnDemand) {
-        doOnDemand = false;
-        LOG.info("Not using ondemand routing because of edge between " + e.getSourceVertexName()
-            + " and " + getLogIdentifier());
-      }
+      e.routingToBegin();
     }
-    return doOnDemand;
   }
   
   private void unsetTasksNotYetScheduled() throws AMUserCodeException {
     if (tasksNotYetScheduled) {
-      boolean doOnDemand = setupEdgeRouting();
+      setupEdgeRouting();
       // change state under lock
       writeLock.lock();
       try {
-        useOnDemandRouting = doOnDemand;
         tasksNotYetScheduled = false;
         // only now can we be sure of the edge manager type. so until now
         // we will accumulate pending tasks in case legacy routing gets used.
@@ -4053,20 +4040,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   
   @Override
   public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
-      int fromEventId, int maxEvents) {
-    if (!useOnDemandRouting) {
-      List<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(attemptID, fromEventId, maxEvents);
-      return new TaskAttemptEventInfo(fromEventId + events.size(), events);
-    }
-
+      int fromEventId, int preRoutedFromEventId, int maxEvents) {
+    ArrayList<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(
+        attemptID, preRoutedFromEventId, maxEvents);
+    int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
+    int nextFromEventId = fromEventId;
     onDemandRouteEventsReadLock.lock();
     try {
-      List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
-      int nextFromEventId = fromEventId;
       int currEventCount = onDemandRouteEvents.size();
       try {
         if (currEventCount > fromEventId) {
-          events = Lists.newArrayListWithCapacity(maxEvents);
+          if (events != TaskImpl.EMPTY_TASK_ATTEMPT_TEZ_EVENTS) {
+            events.ensureCapacity(maxEvents);
+          } else {
+            events = Lists.newArrayListWithCapacity(maxEvents);
+          }
+          int numPreRoutedEvents = events.size();
           int taskIndex = attemptID.getTaskID().getId();
           Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID
               + " vertex: " + getLogIdentifier());
@@ -4118,6 +4107,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
               break;
             }
           }
+          int numEventsSent = events.size() - numPreRoutedEvents;
+          if (numEventsSent > 0) {
+            StringBuilder builder = new StringBuilder();
+            builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent)
+            .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
+            .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ")
+            .append(getLogIdentifier());
+            LOG.info(builder.toString());
+          }
         }
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
@@ -4126,18 +4124,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         nextFromEventId = fromEventId;
         events.clear();
       }
-  
-      if (events.size() > 0) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("Sending ").append(attemptID).append(" numEvents: ").append(events.size())
-        .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
-        .append(" out of ").append(currEventCount).append(" events in vertex: ").append(getLogIdentifier());
-        LOG.info(builder.toString());
-      }
-      return new TaskAttemptEventInfo(nextFromEventId, events);
     } finally {
       onDemandRouteEventsReadLock.unlock();
     }
+    return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
   }
 
   private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
@@ -4206,15 +4196,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
               pendingTaskEvents.add(tezEvent);
             } else {
               // event not from this vertex. must have come from source vertex.
-              if (useOnDemandRouting) {
-                int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId();
-                Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
-                Edge srcEdge = sourceVertices.get(edgeVertex);
-                if (srcEdge == null) {
-                  throw new TezUncheckedException("Bad source vertex: " +
-                      sourceMeta.getTaskVertexName() + " for destination vertex: " +
-                      getLogIdentifier());
-                }
+              int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId();
+              Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
+              Edge srcEdge = sourceVertices.get(edgeVertex);
+              if (srcEdge == null) {
+                throw new TezUncheckedException("Bad source vertex: " +
+                    sourceMeta.getTaskVertexName() + " for destination vertex: " +
+                    getLogIdentifier());
+              }
+              if (srcEdge.hasOnDemandRouting()) {
                 onDemandRouteEventsWriteLock.lock();
                 try {
                   onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
@@ -4223,13 +4213,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                 }
               } else {
                 // send to tasks            
-                Edge srcEdge = sourceVertices.get(getDAG().getVertex(
-                    sourceMeta.getTaskVertexName()));
-                if (srcEdge == null) {
-                  throw new TezUncheckedException("Bad source vertex: "
-                      + sourceMeta.getTaskVertexName() + " for destination vertex: "
-                      + getLogIdentifier());
-                }
                 srcEdge.sendTezEventToDestinationTasks(tezEvent);
               }
             }
@@ -4246,13 +4229,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           // removed.
           pendingTaskEvents.add(tezEvent);          
         } else {
-          if (useOnDemandRouting) {
-            onDemandRouteEvents.add(new EventInfo(tezEvent, null, -1));
-          } else {
-            InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
-            Task targetTask = getTask(riEvent.getTargetIndex());
-            targetTask.registerTezEvent(tezEvent);
-          }
+          InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
+          Task targetTask = getTask(riEvent.getTargetIndex());
+          targetTask.registerTezEvent(tezEvent);
         }
       }
         break;

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 5cd487c..08f6ff6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -162,6 +162,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       ContainerLaunchContext launchContext;
       int numUpdates = 0;
       int nextFromEventId = 0;
+      int nextPreRoutedFromEventId = 0;
       boolean completed;
       String cIdStr;
       AtomicBoolean remove = new AtomicBoolean(false);
@@ -184,6 +185,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
         launchContext = null;
         numUpdates = 0;
         nextFromEventId = 0;
+        nextPreRoutedFromEventId = 0;
         cIdStr = null;
         remove.set(false);
       }
@@ -338,7 +340,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
       if (response.shouldDie()) {
         cData.remove();
       } else {
-        cData.nextFromEventId += response.getNextFromEventId();
+        cData.nextFromEventId = response.getNextFromEventId();
+        cData.nextPreRoutedFromEventId = response.getNextPreRoutedEventId();
         if (!response.getEvents().isEmpty()) {
           long stopTime = System.nanoTime();
           long stopCpuTime = threadMxBean.getCurrentThreadCpuTime();
@@ -421,7 +424,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
               events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
                   EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
               TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
-                  cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
+                  cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
               doHeartbeat(request, cData);
             } else if (version != null && cData.taId.getId() <= version.intValue()) {
               preemptContainer(cData);
@@ -432,7 +435,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
                   new TaskAttemptCompletedEvent(), new EventMetaData(
                       EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
               TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
-                  cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
+                  cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
               doHeartbeat(request, cData);
               cData.clear();
             }

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 1e7faf9..4137d42 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -253,8 +253,8 @@ public class TestMockDAGAppMaster {
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
     VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vB.getName());
     TaskImpl tImpl = (TaskImpl) vImpl.getTask(1);
-    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
-    List<TezEvent> tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+    List<TezEvent> tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
     Assert.assertEquals(2, tEvents.size()); // 2 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
     Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -267,8 +267,8 @@ public class TestMockDAGAppMaster {
         (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
     vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
     tImpl = (TaskImpl) vImpl.getTask(1);
-    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
-    tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
+    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+    tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
     Assert.assertEquals(2, tEvents.size()); // 2 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
     Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -281,8 +281,8 @@ public class TestMockDAGAppMaster {
         (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
     vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
     tImpl = (TaskImpl) vImpl.getTask(1);
-    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
-    tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
+    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+    tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
     Assert.assertEquals(1, tEvents.size()); // 1 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
     Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
@@ -390,22 +390,19 @@ public class TestMockDAGAppMaster {
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
     // vC uses on demand routing and its task does not provide events
     VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
-    Assert.assertEquals(true, vImpl.useOnDemandRouting);
     TaskImpl tImpl = (TaskImpl) vImpl.getTask(0);
     TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
     Assert.assertEquals(0, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
-    // vD is mixed more and does not use on demand routing and its task provides events
+    // vD is mixed mode and only 1 out of 2 edges does legacy routing with task providing events
     vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
-    Assert.assertEquals(false, vImpl.useOnDemandRouting);
     tImpl = (TaskImpl) vImpl.getTask(0);
     taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
-    Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+    Assert.assertEquals(1, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
     // vE has single legacy edge and does not use on demand routing and its task provides events
-    vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
-    Assert.assertEquals(false, vImpl.useOnDemandRouting);
+    vImpl = (VertexImpl) dagImpl.getVertex(vE.getName());
     tImpl = (TaskImpl) vImpl.getTask(0);
     taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
-    Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+    Assert.assertEquals(1, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
 
     tezClient.stop();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index db8eff1..ac816f4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -243,8 +243,8 @@ public class TestTaskAttemptListenerImplTezDag {
 
     doReturn(vertex).when(dag).getVertex(vertexID);
     doReturn("test_vertex").when(vertex).getName();
-    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents);
-    doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, maxEvents);
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
+    doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
 
     taskAttemptListener.registerRunningContainer(containerId);
     taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index fff95b5..4787247 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
@@ -115,7 +116,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.impl.TestDAGImpl.CustomizedEdgeManager.ExceptionLocation;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
 import org.apache.tez.dag.app.rm.AMSchedulerEvent;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
@@ -670,8 +670,8 @@ public class TestDAGImpl {
   }
 
   // v1 -> v2
-  private DAGPlan createDAGWithCustomEdge(ExceptionLocation exLocation) {
-    LOG.info("Setting up dag plan");
+  private DAGPlan createDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
+    LOG.info("Setting up custome edge dag plan " + exLocation + " " + useLegacy);
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("testverteximpl")
         .addVertex(
@@ -723,7 +723,8 @@ public class TestDAGImpl {
          .addEdge(
              EdgePlan.newBuilder()
                  .setEdgeManager(TezEntityDescriptorProto.newBuilder()
-                         .setClassName(CustomizedEdgeManager.class.getName())
+                         .setClassName(useLegacy ? CustomizedEdgeManagerLegacy.class.getName() :
+                           CustomizedEdgeManager.class.getName())
                          .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
                             .setUserPayload(ByteString.copyFromUtf8(exLocation.name())))
                  )
@@ -866,8 +867,12 @@ public class TestDAGImpl {
   }
 
   private void setupDAGWithCustomEdge(ExceptionLocation exLocation) {
+    setupDAGWithCustomEdge(exLocation, false);
+  }
+  
+  private void setupDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
     dagWithCustomEdgeId =  TezDAGID.getInstance(appAttemptId.getApplicationId(), 4);
-    dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation);
+    dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation, useLegacy);
     dagWithCustomEdgeAppContext = mock(AppContext.class);
     doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager();
     dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge,
@@ -1018,7 +1023,7 @@ public class TestDAGImpl {
         new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
     dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
     dispatcher.await();
-    v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+    v2.getTaskAttemptTezEvents(ta1.getID(), 0, 0, 1000);
     dispatcher.await();
 
     Assert.assertEquals(VertexState.FAILED, v2.getState());
@@ -1031,7 +1036,7 @@ public class TestDAGImpl {
   @Test(timeout = 5000)
   public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting() {
     // Remove after legacy routing is removed
-    setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
+    setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true);
     dispatcher.getEventHandler().handle(
         new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
     dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
@@ -1041,8 +1046,7 @@ public class TestDAGImpl {
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
-    v1.useOnDemandRouting = false;
-    v2.useOnDemandRouting = false;
+
     dispatcher.await();
     Task t1= v2.getTask(0);
     TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
@@ -1063,7 +1067,7 @@ public class TestDAGImpl {
   @Test(timeout = 5000)
   public void testEdgeManager_RouteInputSourceTaskFailedEventToDestinationLegacyRouting() {
     // Remove after legacy routing is removed
-    setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
+    setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination, true);
     dispatcher.getEventHandler().handle(
         new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
     dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
@@ -1073,8 +1077,6 @@ public class TestDAGImpl {
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
-    v1.useOnDemandRouting = false;
-    v2.useOnDemandRouting = false;
     dispatcher.await();
 
     Task t1= v2.getTask(0);
@@ -1084,7 +1086,7 @@ public class TestDAGImpl {
         new EventMetaData(EventProducerConsumerType.INPUT,"vertex1", "vertex2", ta1.getID()));
     dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
     dispatcher.await();
-    v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+    v2.getTaskAttemptTezEvents(ta1.getID(), 0, 0, 1000);
     dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v2.getState());
     
@@ -1802,18 +1804,17 @@ public class TestDAGImpl {
         TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
   }
 
-  public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
-
-    public static enum ExceptionLocation {
-      Initialize,
-      GetNumDestinationTaskPhysicalInputs,
-      GetNumSourceTaskPhysicalOutputs,
-      RouteDataMovementEventToDestination,
-      RouteInputSourceTaskFailedEventToDestination,
-      GetNumDestinationConsumerTasks,
-      RouteInputErrorEventToSource
-    }
+  public static enum ExceptionLocation {
+    Initialize,
+    GetNumDestinationTaskPhysicalInputs,
+    GetNumSourceTaskPhysicalOutputs,
+    RouteDataMovementEventToDestination,
+    RouteInputSourceTaskFailedEventToDestination,
+    GetNumDestinationConsumerTasks,
+    RouteInputErrorEventToSource
+  }
 
+  public static class CustomizedEdgeManagerLegacy extends EdgeManagerPlugin {
     private ExceptionLocation exLocation;
 
     public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
@@ -1821,7 +1822,7 @@ public class TestDAGImpl {
         .setUserPayload(UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes())));
     }
 
-    public CustomizedEdgeManager(EdgeManagerPluginContext context) {
+    public CustomizedEdgeManagerLegacy(EdgeManagerPluginContext context) {
       super(context);
       this.exLocation = ExceptionLocation.valueOf(
           new String(context.getUserPayload().deepCopyAsArray()));
@@ -1890,6 +1891,55 @@ public class TestDAGImpl {
       }
       return 0;
     }
+  }
+
+  public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
+    private ExceptionLocation exLocation;
+
+    public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
+      return EdgeManagerPluginDescriptor.create(CustomizedEdgeManager.class.getName())
+        .setUserPayload(UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes())));
+    }
+
+    public CustomizedEdgeManager(EdgeManagerPluginContext context) {
+      super(context);
+      this.exLocation = ExceptionLocation.valueOf(
+          new String(context.getUserPayload().deepCopyAsArray()));
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      if (exLocation == ExceptionLocation.Initialize) {
+        throw new Exception(exLocation.name());
+      }
+    }
+
+    @Override
+    public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
+
+    @Override
+    public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
+
+    @Override
+    public int getNumDestinationConsumerTasks(int sourceTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
 
     @Override
     public int routeInputErrorEventToSource(int destinationTaskIndex,

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6c94465..e569949 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2493,7 +2493,13 @@ public class TestVertexImpl {
     // verify all events have been put in pending.
     // this is not necessary after legacy routing has been removed
     Assert.assertEquals(5, v4.pendingTaskEvents.size());
-    v4.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    // scheduling start to trigger edge routing to begin
+    for (int i=0; i<v4.getTotalTasks(); ++i) {
+      taskList.add(new TaskWithLocationHint(i, null));
+    }
+    v4.scheduleTasks(taskList);
+    dispatcher.await();
     // verify all events have been moved to taskEvents
     Assert.assertEquals(5, v4.getOnDemandRouteEvents().size());
     for (int i=5; i<11; ++i) {
@@ -2511,7 +2517,7 @@ public class TestVertexImpl {
     e.edgeManager = mockPlugin;
     // source task id will not match. all events will return null
     when(mockPlugin.routeDataMovementEventToDestination(1, 0, 0)).thenReturn(mockRoute);
-    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, 0, 1);
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, 0, 0, 1);
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
     
@@ -2522,24 +2528,24 @@ public class TestVertexImpl {
         mockPlugin.routeDataMovementEventToDestination(anyInt(),
             anyInt(), anyInt())).thenReturn(mockRoute);
     for (int i=0; i<11; ++i) {
-      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 1);
       fromEventId = eventInfo.getNextFromEventId();
       Assert.assertEquals((i+1), fromEventId);
       Assert.assertEquals(1, eventInfo.getEvents().size());
     }
-    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 1);
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
     
     // change max events to larger value. max events does not evenly divide total events
     fromEventId = 0;
     for (int i=1; i<=2; ++i) {
-      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
       fromEventId = eventInfo.getNextFromEventId();
       Assert.assertEquals((i*5), fromEventId);
       Assert.assertEquals(5, eventInfo.getEvents().size());
     }
-    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(1, eventInfo.getEvents().size()); // remainder events
     
@@ -2551,12 +2557,12 @@ public class TestVertexImpl {
     fromEventId = 0;
     int lastFromEventId = 0;
     for (int i=1; i<=4; ++i) {
-      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
       fromEventId = eventInfo.getNextFromEventId();
       Assert.assertEquals((i%2 > 0 ? (lastFromEventId+=2) : (lastFromEventId+=3)), fromEventId);     
       Assert.assertEquals(5, eventInfo.getEvents().size());
     }
-    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
     Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
     Assert.assertEquals(2, eventInfo.getEvents().size()); // remainder events
   }
@@ -2754,15 +2760,12 @@ public class TestVertexImpl {
   }
   
   @Test(timeout = 5000)
-  public void testVertexPendingTaskEventsLegacyRouting() {
+  public void testVertexPendingTaskEvents() {
     // Remove after bulk routing API is removed
     initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
     VertexImpl v2 = vertices.get("vertex2");
     VertexImpl v1 = vertices.get("vertex1");
-    v1.useOnDemandRouting = false;
-    v2.useOnDemandRouting = false;
-    v3.useOnDemandRouting = false;
     
     startVertex(v1);
     
@@ -4733,14 +4736,19 @@ public class TestVertexImpl {
       Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
     }
     
-    // fake scheduling start to trigger edge routing to begin
-    v1.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+    List<TaskWithLocationHint> taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    // scheduling start to trigger edge routing to begin
+    for (int i=0; i<v1.getTotalTasks(); ++i) {
+      taskList.add(new TaskWithLocationHint(i, null));
+    }
+    v1.scheduleTasks(taskList);
+    dispatcher.await();
     // check all tasks get their events
     for (int i=0; i<v1.getTotalTasks(); ++i) {
       Assert.assertEquals(
           1,
           v1.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v1.getTask(i).getTaskId(), 0),
-              0, 100).getEvents().size());
+              0, 0, 100).getEvents().size());
     }
     
     VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
@@ -4775,14 +4783,20 @@ public class TestVertexImpl {
     }
     Assert.assertEquals(true, initializerManager2.hasShutDown);
     
-    // fake scheduling start to trigger edge routing to begin
-    v2.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+    // scheduling start to trigger edge routing to begin
+    taskList = new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+    // scheduling start to trigger edge routing to begin
+    for (int i=0; i<v2.getTotalTasks(); ++i) {
+      taskList.add(new TaskWithLocationHint(i, null));
+    }
+    v2.scheduleTasks(taskList);
+    dispatcher.await();
     // check all tasks get their events
     for (int i=0; i<v2.getTotalTasks(); ++i) {
       Assert.assertEquals(
           ((i==0) ? 2 : 1),
           v2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v2.getTask(i).getTaskId(), 0),
-              0, 100).getEvents().size());
+              0, 0, 100).getEvents().size());
     }
     for (int i = 0; i < 10; i++) {
       List<InputSpec> inputSpecs = v1.getInputSpecList(i);
@@ -4801,7 +4815,6 @@ public class TestVertexImpl {
 
     VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
         .get("vertex1");
-    v1.useOnDemandRouting = false;
     dispatcher.getEventHandler().handle(
         new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
     dispatcher.await();
@@ -4828,7 +4841,6 @@ public class TestVertexImpl {
     
     VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
-    v2.useOnDemandRouting = false;
 
     // non-task events don't get buffered
     List<TezEvent> events = Lists.newLinkedList();

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 921095c..17d7053 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -50,6 +50,7 @@ public abstract class RuntimeTask {
   protected final TezUmbilical tezUmbilical;
   protected final AtomicInteger eventCounter;
   protected final AtomicInteger nextFromEventId;
+  protected final AtomicInteger nextPreRoutedEventId;
   private final AtomicBoolean taskDone;
   private final TaskCounterUpdater counterUpdater;
   private final TaskStatistics statistics;
@@ -62,6 +63,7 @@ public abstract class RuntimeTask {
     this.tezCounters = new TezCounters();
     this.eventCounter = new AtomicInteger(0);
     this.nextFromEventId = new AtomicInteger(0);
+    this.nextPreRoutedEventId = new AtomicInteger(0);
     this.progress = 0.0f;
     this.taskDone = new AtomicBoolean(false);
     this.statistics = new TaskStatistics();
@@ -137,9 +139,17 @@ public abstract class RuntimeTask {
     return nextFromEventId.get();
   }
   
+  public int getNextPreRoutedEventId() {
+    return nextPreRoutedEventId.get();
+  }
+  
   public void setNextFromEventId(int nextFromEventId) {
     this.nextFromEventId.set(nextFromEventId);
   }
+  
+  public void setNextPreRoutedEventId(int nextPreRoutedEventId) {
+    this.nextPreRoutedEventId.set(nextPreRoutedEventId);
+  }
 
   public boolean isTaskDone() {
     return taskDone.get();

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
index 3baef93..7ed89f8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatRequest.java
@@ -36,6 +36,7 @@ public class TezHeartbeatRequest implements Writable {
   private List<TezEvent> events;
   private TezTaskAttemptID currentTaskAttemptID;
   private int startIndex;
+  private int preRoutedStartIndex;
   private int maxEvents;
   private long requestId;
 
@@ -43,12 +44,13 @@ public class TezHeartbeatRequest implements Writable {
   }
 
   public TezHeartbeatRequest(long requestId, List<TezEvent> events,
-      String containerIdentifier, TezTaskAttemptID taskAttemptID,
-      int startIndex, int maxEvents) {
+      int preRoutedStartIndex, String containerIdentifier,
+      TezTaskAttemptID taskAttemptID, int startIndex, int maxEvents) {
     this.containerIdentifier = containerIdentifier;
     this.requestId = requestId;
     this.events = Collections.unmodifiableList(events);
     this.startIndex = startIndex;
+    this.preRoutedStartIndex = preRoutedStartIndex;
     this.maxEvents = maxEvents;
     this.currentTaskAttemptID = taskAttemptID;
   }
@@ -65,6 +67,10 @@ public class TezHeartbeatRequest implements Writable {
     return startIndex;
   }
 
+  public int getPreRoutedStartIndex() {
+    return preRoutedStartIndex;
+  }
+
   public int getMaxEvents() {
     return maxEvents;
   }
@@ -95,6 +101,7 @@ public class TezHeartbeatRequest implements Writable {
       out.writeBoolean(false);
     }
     out.writeInt(startIndex);
+    out.writeInt(preRoutedStartIndex);
     out.writeInt(maxEvents);
     out.writeLong(requestId);
     Text.writeString(out, containerIdentifier);
@@ -117,6 +124,7 @@ public class TezHeartbeatRequest implements Writable {
       currentTaskAttemptID = null;
     }
     startIndex = in.readInt();
+    preRoutedStartIndex = in.readInt();
     maxEvents = in.readInt();
     requestId = in.readLong();
     containerIdentifier = Text.readString(in);
@@ -128,6 +136,7 @@ public class TezHeartbeatRequest implements Writable {
         + " containerId=" + containerIdentifier
         + ", requestId=" + requestId
         + ", startIndex=" + startIndex
+        + ", preRoutedStartIndex=" + preRoutedStartIndex
         + ", maxEventsToGet=" + maxEvents
         + ", taskAttemptId=" + currentTaskAttemptID
         + ", eventCount=" + (events != null ? events.size() : 0)

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
index cecc706..0aa4db4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
@@ -33,6 +33,7 @@ public class TezHeartbeatResponse implements Writable {
   private boolean shouldDie = false;
   private List<TezEvent> events;
   private int nextFromEventId;
+  private int nextPreRoutedEventId;
 
   public TezHeartbeatResponse() {
   }
@@ -57,6 +58,10 @@ public class TezHeartbeatResponse implements Writable {
     return nextFromEventId;
   }
 
+  public int getNextPreRoutedEventId() {
+    return nextPreRoutedEventId;
+  }
+
   public void setEvents(List<TezEvent> events) {
     this.events = Collections.unmodifiableList(events);
   }
@@ -73,11 +78,16 @@ public class TezHeartbeatResponse implements Writable {
     this.nextFromEventId = nextFromEventId;
   }
 
+  public void setNextPreRoutedEventId(int nextPreRoutedEventId) {
+    this.nextPreRoutedEventId = nextPreRoutedEventId;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeLong(lastRequestId);
     out.writeBoolean(shouldDie);
     out.writeInt(nextFromEventId);
+    out.writeInt(nextPreRoutedEventId);
     if(events != null) {
       out.writeBoolean(true);
       out.writeInt(events.size());
@@ -94,6 +104,7 @@ public class TezHeartbeatResponse implements Writable {
     lastRequestId = in.readLong();
     shouldDie = in.readBoolean();
     nextFromEventId = in.readInt();
+    nextPreRoutedEventId = in.readInt();
     if(in.readBoolean()) {
       int eventCount = in.readInt();
       events = new ArrayList<TezEvent>(eventCount);
@@ -111,6 +122,7 @@ public class TezHeartbeatResponse implements Writable {
         + " lastRequestId=" + lastRequestId
         + ", shouldDie=" + shouldDie
         + ", nextFromEventId=" + nextFromEventId
+        + ", nextPreRoutedEventId=" + nextPreRoutedEventId
         + ", eventCount=" + (events != null ? events.size() : 0)
         + " }";
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/41173aa0/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 8b9db16..d9a7786 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -249,8 +249,9 @@ public class TaskReporter {
 
       long requestId = requestCounter.incrementAndGet();
       int fromEventId = task.getNextFromEventId();
-      TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
-          task.getTaskAttemptID(), fromEventId, maxEventsToGet);
+      int fromPreRoutedEventId = task.getNextPreRoutedEventId();
+      TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
+          containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending heartbeat to AM, request=" + request);
       }
@@ -282,6 +283,7 @@ public class TaskReporter {
         }
       } else {
         task.setNextFromEventId(response.getNextFromEventId());
+        task.setNextPreRoutedEventId(response.getNextPreRoutedEventId());
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
           LOG.info("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
               + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()