You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:51 UTC

[13/43] tez git commit: TEZ-776. Reduce AM mem usage caused by storing TezEvents (bikas)

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5d61642..a16ee0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -91,6 +91,7 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
@@ -134,6 +135,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
+import org.apache.tez.dag.app.dag.impl.Edge.PendingEventRouteMetadata;
 import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -202,7 +204,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   //final fields
   private final Clock clock;
 
-
   private final Lock readLock;
   private final Lock writeLock;
   private final TaskAttemptListener taskAttemptListener;
@@ -225,6 +226,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private Configuration vertexConf;
   
   private final boolean isSpeculationEnabled;
+  
+  @VisibleForTesting
+  public boolean useOnDemandRouting = true;
 
   //fields initialized in init
 
@@ -726,9 +730,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private Set<String> inputsWithInitializers;
   private int numInitializedInputs;
   private boolean startSignalPending = false;
-  private boolean tasksNotYetScheduled = true;
   // We may always store task events in the vertex for scalability
   List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
+  private boolean tasksNotYetScheduled = true;
+  // must be a random access structure
+  
+  private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity(1000);
+  private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock();
+  private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock();
+  private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock();
+  
+  private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
+      new ArrayList(0);
   List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
   List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
 
@@ -771,6 +784,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private VertexStats vertexStats = null;
 
   private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
+  
+  static class EventInfo {
+    final TezEvent tezEvent;
+    final Edge eventEdge;
+    final int eventTaskIndex;
+    EventInfo(TezEvent tezEvent, Edge eventEdge, int eventTaskIndex) {
+      this.tezEvent = tezEvent;
+      this.eventEdge = eventEdge;
+      this.eventTaskIndex = eventTaskIndex;
+    }
+  }
 
   private VertexStatisticsImpl finalStatistics;
 
@@ -1175,6 +1199,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
 
+  @VisibleForTesting
+  List<EventInfo> getOnDemandRouteEvents() {
+    return onDemandRouteEvents;
+  }
+  
   private void computeProgress() {
     this.readLock.lock();
     try {
@@ -1388,24 +1417,51 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
   
-  @Override
-  public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
-    writeLock.lock();
-    try {
+  void setupEdgeRouting() throws AMUserCodeException {
+    for (Edge e : sourceVertices.values()) {
+      boolean edgeDoingOnDemand = e.routingToBegin();
+      if (useOnDemandRouting && !edgeDoingOnDemand) {
+        useOnDemandRouting = false;
+        LOG.info("Not using ondemand routing because of edge between " + e.getSourceVertexName()
+            + " and " + getLogIdentifier());
+      }
+    }
+  }
+  
+  private void unsetTasksNotYetScheduled() throws AMUserCodeException {
+    if (tasksNotYetScheduled) {
+      setupEdgeRouting();
       tasksNotYetScheduled = false;
+      // only now can we be sure of the edge manager type. so until now
+      // we will accumulate pending tasks in case legacy routing gets used.
+      // this is only needed to support mixed mode routing. Else for
+      // on demand routing events can be directly added to taskEvents when 
+      // they arrive in handleRoutedEvents instead of first caching them in 
+      // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents
+      // can be removed.
       if (!pendingTaskEvents.isEmpty()) {
         LOG.info("Routing pending task events for vertex: " + logIdentifier);
         try {
-          handleRoutedTezEvents(this, pendingTaskEvents, false, true);
+          handleRoutedTezEvents(pendingTaskEvents, false, true);
         } catch (AMUserCodeException e) {
-          String msg = "Exception in " + e.getSource() +", vertex=" + logIdentifier;
+          String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier;
           LOG.error(msg, e);
-          addDiagnostic(msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
-          eventHandler.handle(new VertexEventTermination(vertexId, VertexTerminationCause.AM_USERCODE_FAILURE));
+          addDiagnostic(msg + ", " + e.getMessage() + ", "
+              + ExceptionUtils.getStackTrace(e.getCause()));
+          eventHandler.handle(new VertexEventTermination(vertexId,
+              VertexTerminationCause.AM_USERCODE_FAILURE));
           return;
         }
         pendingTaskEvents.clear();
       }
+    }
+  }
+  
+  @Override
+  public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
+    writeLock.lock();
+    try {
+      unsetTasksNotYetScheduled();
       for (TaskWithLocationHint task : tasksToSchedule) {
         if (numTasks <= task.getTaskIndex().intValue()) {
           throw new TezUncheckedException(
@@ -1422,6 +1478,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue()),
             TaskEventType.T_SCHEDULE));
       }
+    } catch (AMUserCodeException e) {
+      String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
+      LOG.error(msg, e);
+      // send event to fail the vertex
+      eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e));
+      // throw an unchecked exception to stop the vertex manager that invoked this.
+      throw new TezUncheckedException(e);
     } finally {
       writeLock.unlock();
     }
@@ -2497,7 +2560,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           }
           break;
         case RUNNING:
-          vertex.tasksNotYetScheduled = false;
           try {
             vertex.initializeCommitters();
           } catch (Exception e) {
@@ -2530,6 +2592,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             }
             try {
               vertex.recoveryCodeSimulatingStart();
+              vertex.unsetTasksNotYetScheduled();
               endState = VertexState.RUNNING;
             } catch (AMUserCodeException e) {
               String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -2560,7 +2623,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
                 VertexTerminationCause.COMMIT_FAILURE, msg);
             endState = VertexState.FAILED;
           } else {
-            vertex.tasksNotYetScheduled = false;
             // recover tasks
             if (vertex.tasks != null && vertex.numTasks != 0) {
               TaskState taskState = TaskState.KILLED;
@@ -2578,6 +2640,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
               }
               try {
                 vertex.recoveryCodeSimulatingStart();
+                vertex.unsetTasksNotYetScheduled();
                 endState = VertexState.RUNNING;
               } catch (AMUserCodeException e) {
                 String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
@@ -2901,7 +2964,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           endState = VertexState.INITED;
           break;
         case RUNNING:
-          vertex.tasksNotYetScheduled = false;
           // if commit in progress and desired state is not a succeeded one,
           // move to failed
           if (vertex.recoveryCommitInProgress) {
@@ -2946,6 +3008,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             }
             try {
               vertex.recoveryCodeSimulatingStart();
+              vertex.unsetTasksNotYetScheduled();
               endState = VertexState.RUNNING;
             } catch (AMUserCodeException e) {
               String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
@@ -2962,7 +3025,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         case SUCCEEDED:
         case FAILED:
         case KILLED:
-          vertex.tasksNotYetScheduled = false;
           // recover tasks
           assert vertex.tasks.size() == vertex.numTasks;
           if (vertex.tasks != null  && vertex.numTasks != 0) {
@@ -2982,6 +3044,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             // Wait for all tasks to recover and report back
             try {
               vertex.recoveryCodeSimulatingStart();
+              vertex.unsetTasksNotYetScheduled();
               endState = VertexState.RUNNING;
             } catch (AMUserCodeException e) {
               String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -3025,7 +3088,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         vertex.recoveredEvents.clear();
         if (!vertex.pendingRouteEvents.isEmpty()) {
           try {
-            handleRoutedTezEvents(vertex, vertex.pendingRouteEvents, false, true);
+            vertex.handleRoutedTezEvents(vertex.pendingRouteEvents, false, true);
             vertex.pendingRouteEvents.clear();
           } catch (AMUserCodeException e) {
             String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
@@ -3284,7 +3347,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       List<TezEvent> inputInfoEvents = iEvent.getEvents();
       try {
         if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
-          VertexImpl.handleRoutedTezEvents(vertex, inputInfoEvents, false, false);
+          vertex.handleRoutedTezEvents(inputInfoEvents, false, false);
         }
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
@@ -3941,7 +4004,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       boolean recovered = rEvent.isRecovered();
       List<TezEvent> tezEvents = rEvent.getEvents();
       try {
-        VertexImpl.handleRoutedTezEvents(vertex, tezEvents, recovered, false);
+        vertex.handleRoutedTezEvents(tezEvents, recovered, false);
       } catch (AMUserCodeException e) {
         String msg = "Exception in " + e.getSource() + ", vertex=" + vertex.getLogIdentifier();
         LOG.error(msg, e);
@@ -3959,16 +4022,105 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       return vertex.getState();
     }
   }
+  
+  @Override
+  public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+      int fromEventId, int maxEvents) {
+    if (!useOnDemandRouting) {
+      List<TezEvent> events = getTask(attemptID.getTaskID()).getTaskAttemptTezEvents(attemptID, fromEventId, maxEvents);
+      return new TaskAttemptEventInfo(fromEventId + events.size(), events);
+    }
 
-  private static void handleRoutedTezEvents(VertexImpl vertex, List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
-    if (vertex.getAppContext().isRecoveryEnabled()
+    onDemandRouteEventsReadLock.lock();
+    try {
+      List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
+      int nextFromEventId = fromEventId;
+      int currEventCount = onDemandRouteEvents.size();
+      try {
+        if (currEventCount > fromEventId) {
+          events = Lists.newArrayListWithCapacity(maxEvents);
+          int taskIndex = attemptID.getTaskID().getId();
+          Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID
+              + " vertex: " + getLogIdentifier());
+          boolean isFirstEvent = true;
+          for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId) {
+            boolean earlyExit = false;
+            if (events.size() == maxEvents) {
+              break;
+            }
+            EventInfo eventInfo = onDemandRouteEvents.get(nextFromEventId);
+            TezEvent tezEvent = eventInfo.tezEvent;
+            switch(tezEvent.getEventType()) {
+            case INPUT_FAILED_EVENT:
+            case DATA_MOVEMENT_EVENT:
+            case COMPOSITE_DATA_MOVEMENT_EVENT:
+              {
+                int srcTaskIndex = eventInfo.eventTaskIndex;
+                Edge srcEdge = eventInfo.eventEdge;
+                PendingEventRouteMetadata pendingRoute = null;
+                if (isFirstEvent) {
+                  // do this precondition check only for the first event
+                  isFirstEvent = false;
+                  pendingRoute = srcEdge.removePendingEvents(attemptID);
+                  if (pendingRoute != null) {
+                    Preconditions.checkState(tezEvent == pendingRoute.getTezEvent()); // same object
+                  }
+                }
+                if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex,
+                    events, maxEvents, pendingRoute)) {
+                  // not enough space left for this iteration events.
+                  // Exit and start from here next time
+                  earlyExit = true;
+                }
+              }
+              break;
+            case ROOT_INPUT_DATA_INFORMATION_EVENT:
+              {
+                InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
+                if (riEvent.getTargetIndex() == taskIndex) {
+                  events.add(tezEvent);
+                }
+              }
+              break;
+            default:
+              throw new TezUncheckedException("Unexpected event type for task: "
+                  + tezEvent.getEventType());
+            }
+            if (earlyExit) {
+              break;
+            }
+          }
+        }
+      } catch (AMUserCodeException e) {
+        String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
+        LOG.error(msg, e);
+        eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e));
+        nextFromEventId = fromEventId;
+        events.clear();
+      }
+  
+      if (events.size() > 0) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("Sending ").append(attemptID).append(" numEvents: ").append(events.size())
+        .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId)
+        .append(" out of ").append(currEventCount).append(" events in vertex: ").append(getLogIdentifier());
+        LOG.info(builder.toString());
+      }
+      return new TaskAttemptEventInfo(nextFromEventId, events);
+    } finally {
+      onDemandRouteEventsReadLock.unlock();
+    }
+  }
+
+  private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean recovered, boolean isPendingEvents) throws AMUserCodeException {
+    if (getAppContext().isRecoveryEnabled()
         && !recovered
         && !isPendingEvents
         && !tezEvents.isEmpty()) {
       List<TezEvent> recoveryEvents =
           Lists.newArrayList();
       for (TezEvent tezEvent : tezEvents) {
-        if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
+        if (!isEventFromVertex(this, tezEvent.getSourceInfo())) {
           continue;
         }
         if  (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
@@ -3980,15 +4132,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
       if (!recoveryEvents.isEmpty()) {
         VertexRecoverableEventsGeneratedEvent historyEvent =
-            new VertexRecoverableEventsGeneratedEvent(vertex.vertexId,
+            new VertexRecoverableEventsGeneratedEvent(vertexId,
                 recoveryEvents);
-        vertex.appContext.getHistoryHandler().handle(
-            new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
+        appContext.getHistoryHandler().handle(
+            new DAGHistoryEvent(getDAGId(), historyEvent));
       }
     }
     for(TezEvent tezEvent : tezEvents) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Vertex: " + vertex.getName() + " routing event: "
+        LOG.debug("Vertex: " + getLogIdentifier() + " routing event: "
             + tezEvent.getEventType()
             + " Recovered:" + recovered);
       }
@@ -3998,7 +4150,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       case DATA_MOVEMENT_EVENT:
       case COMPOSITE_DATA_MOVEMENT_EVENT:
         {
-          if (isEventFromVertex(vertex, sourceMeta)) {
+          if (isEventFromVertex(this, sourceMeta)) {
             // event from this vertex. send to destination vertex
             TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
             if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
@@ -4008,56 +4160,86 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             } else {
               ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
             }
-            Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
-            Edge destEdge = vertex.targetVertices.get(destVertex);
+            Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
+            Edge destEdge = targetVertices.get(destVertex);
             if (destEdge == null) {
               throw new TezUncheckedException("Bad destination vertex: " +
                   sourceMeta.getEdgeVertexName() + " for event vertex: " +
-                  vertex.getLogIdentifier());
+                  getLogIdentifier());
             }
-            vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex
+            eventHandler.handle(new VertexEventRouteEvent(destVertex
                 .getVertexId(), Collections.singletonList(tezEvent)));
           } else {
-            // event not from this vertex. must have come from source vertex.
-            // send to tasks
-            if (vertex.tasksNotYetScheduled) {
-              vertex.pendingTaskEvents.add(tezEvent);
+            if (tasksNotYetScheduled) {
+              // this is only needed to support mixed mode routing. Else for
+              // on demand routing events can be directly added to taskEvents
+              // when legacy routing is removed then pending task events can be
+              // removed.
+              pendingTaskEvents.add(tezEvent);
             } else {
-              Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
-                  sourceMeta.getTaskVertexName()));
-              if (srcEdge == null) {
-                throw new TezUncheckedException("Bad source vertex: " +
-                    sourceMeta.getTaskVertexName() + " for destination vertex: " +
-                    vertex.getLogIdentifier());
+              // event not from this vertex. must have come from source vertex.
+              if (useOnDemandRouting) {
+                int srcTaskIndex = sourceMeta.getTaskAttemptID().getTaskID().getId();
+                Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
+                Edge srcEdge = sourceVertices.get(edgeVertex);
+                if (srcEdge == null) {
+                  throw new TezUncheckedException("Bad source vertex: " +
+                      sourceMeta.getTaskVertexName() + " for destination vertex: " +
+                      getLogIdentifier());
+                }
+                onDemandRouteEventsWriteLock.lock();
+                try {
+                  onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
+                } finally {
+                  onDemandRouteEventsWriteLock.unlock();
+                }
+              } else {
+                // send to tasks            
+                Edge srcEdge = sourceVertices.get(getDAG().getVertex(
+                    sourceMeta.getTaskVertexName()));
+                if (srcEdge == null) {
+                  throw new TezUncheckedException("Bad source vertex: "
+                      + sourceMeta.getTaskVertexName() + " for destination vertex: "
+                      + getLogIdentifier());
+                }
+                srcEdge.sendTezEventToDestinationTasks(tezEvent);
               }
-              srcEdge.sendTezEventToDestinationTasks(tezEvent);
             }
           }
         }
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
-        if (vertex.tasksNotYetScheduled) {
-          vertex.pendingTaskEvents.add(tezEvent);
+      {   
+        checkEventSourceMetadata(this, sourceMeta);
+        if (tasksNotYetScheduled) {
+          // this is only needed to support mixed mode routing. Else for
+          // on demand routing events can be directly added to taskEvents
+          // when legacy routing is removed then pending task events can be
+          // removed.
+          pendingTaskEvents.add(tezEvent);          
         } else {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent
-              .getEvent();
-          Task targetTask = vertex.getTask(riEvent.getTargetIndex());
-          targetTask.registerTezEvent(tezEvent);
+          if (useOnDemandRouting) {
+            onDemandRouteEvents.add(new EventInfo(tezEvent, null, -1));
+          } else {
+            InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
+            Task targetTask = getTask(riEvent.getTargetIndex());
+            targetTask.registerTezEvent(tezEvent);
+          }
         }
+      }
         break;
       case VERTEX_MANAGER_EVENT:
       {
         // VM events on task success only can be changed as part of TEZ-1532
         VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
-        Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
+        Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName());
         Preconditions.checkArgument(target != null,
             "Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
-        if (target == vertex) {
-          vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
+        if (target == this) {
+          vertexManager.onVertexManagerEventReceived(vmEvent);
         } else {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          vertex.eventHandler.handle(new VertexEventRouteEvent(target
+          checkEventSourceMetadata(this, sourceMeta);
+          eventHandler.handle(new VertexEventRouteEvent(target
               .getVertexId(), Collections.singletonList(tezEvent)));
         }
       }
@@ -4065,45 +4247,46 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       case ROOT_INPUT_INITIALIZER_EVENT:
       {
         InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent();
-        Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
+        Vertex target = getDAG().getVertex(riEvent.getTargetVertexName());
         Preconditions.checkArgument(target != null,
             "Event sent to unknown vertex: " + riEvent.getTargetVertexName());
         riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
-        if (target == vertex) {
-          if (vertex.rootInputDescriptors == null ||
-              !vertex.rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
+        if (target == this) {
+          if (rootInputDescriptors == null ||
+              !rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
             throw new TezUncheckedException(
                 "InputInitializerEvent targeted at unknown initializer on vertex " +
-                    vertex.logIdentifier + ", Event=" + riEvent);
+                    logIdentifier + ", Event=" + riEvent);
           }
-          if (vertex.getState() == VertexState.NEW) {
-            vertex.pendingInitializerEvents.add(tezEvent);
-          } else  if (vertex.getState() == VertexState.INITIALIZING) {
-            vertex.rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
+          if (getState() == VertexState.NEW) {
+            pendingInitializerEvents.add(tezEvent);
+          } else  if (getState() == VertexState.INITIALIZING) {
+            rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
           } else {
             // Currently, INITED and subsequent states means Initializer complete / failure
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in " + vertex.getLogIdentifier() + ", state=" + vertex.getState());
+              LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in "
+                  + getLogIdentifier() + ", state=" + getState());
             }
           }
         } else {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
+          checkEventSourceMetadata(this, sourceMeta);
+          eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
               Collections.singletonList(tezEvent)));
         }
       }
         break;
       case INPUT_READ_ERROR_EVENT:
         {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
+          checkEventSourceMetadata(this, sourceMeta);
+          Edge srcEdge = sourceVertices.get(this.getDAG().getVertex(
               sourceMeta.getEdgeVertexName()));
           srcEdge.sendTezEventToSourceTasks(tezEvent);
         }
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
         {
-          checkEventSourceMetadata(vertex, sourceMeta);
+          checkEventSourceMetadata(this, sourceMeta);
           TaskAttemptTerminationCause errCause = null;
           switch (sourceMeta.getEventGenerator()) {
           case INPUT:
@@ -4124,7 +4307,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           }
           TaskAttemptFailedEvent taskFailedEvent =
               (TaskAttemptFailedEvent) tezEvent.getEvent();
-          vertex.getEventHandler().handle(
+          getEventHandler().handle(
               new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
                   TaskAttemptEventType.TA_FAILED,
                   "Error: " + taskFailedEvent.getDiagnostics(), 
@@ -4134,8 +4317,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         break;
       case TASK_ATTEMPT_COMPLETED_EVENT:
         {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          vertex.getEventHandler().handle(
+          checkEventSourceMetadata(this, sourceMeta);
+          getEventHandler().handle(
               new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
         }
         break;

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 18286b5..5cd487c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -92,8 +92,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
   MockContainerLauncher containerLauncher;
   boolean initFailFlag;
   boolean startFailFlag;
-  boolean sendDMEvents;
   boolean recoveryFatalError = false;
+  EventsDelegate eventsDelegate;
   CountersDelegate countersDelegate;
   StatisticsDelegate statsDelegate;
   long launcherSleepTime = 1;
@@ -112,6 +112,10 @@ public class MockDAGAppMaster extends DAGAppMaster {
   public static interface CountersDelegate {
     public TezCounters getCounters(TaskSpec taskSpec);
   }
+  
+  public static interface EventsDelegate {
+    public void getEvents(TaskSpec taskSpec, List<TezEvent> events);
+  }
 
   // mock container launcher does not launch real tasks.
   // Upon, launch of a container is simulates the container asking for tasks
@@ -334,7 +338,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       if (response.shouldDie()) {
         cData.remove();
       } else {
-        cData.nextFromEventId += response.getEvents().size();
+        cData.nextFromEventId += response.getNextFromEventId();
         if (!response.getEvents().isEmpty()) {
           long stopTime = System.nanoTime();
           long stopCpuTime = threadMxBean.getCurrentThreadCpuTime();
@@ -400,19 +404,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
                 updatesToMake != null && cData.numUpdates < updatesToMake) {
               List<TezEvent> events = Lists.newArrayListWithCapacity(
                                       cData.taskSpec.getOutputs().size() + 1);
-              if (sendDMEvents) {
-                for (OutputSpec output : cData.taskSpec.getOutputs()) {
-                  if (output.getPhysicalEdgeCount() == 1) {
-                    events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
-                        EventProducerConsumerType.OUTPUT, cData.vName, output
-                            .getDestinationVertexName(), cData.taId)));
-                  } else {
-                    events.add(new TezEvent(CompositeDataMovementEvent.create(0,
-                        output.getPhysicalEdgeCount(), null), new EventMetaData(
-                        EventProducerConsumerType.OUTPUT, cData.vName, output
-                            .getDestinationVertexName(), cData.taId)));
-                  }
-                }
+              if (cData.numUpdates == 0 && eventsDelegate != null) {
+                eventsDelegate.getEvents(cData.taskSpec, events);
               }
               TezCounters counters = null;
               if (countersDelegate != null) {
@@ -428,7 +421,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
               events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData(
                   EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)));
               TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
-                  cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
+                  cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
               doHeartbeat(request, cData);
             } else if (version != null && cData.taId.getId() <= version.intValue()) {
               preemptContainer(cData);

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
new file mode 100644
index 0000000..c277b38
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
@@ -0,0 +1,219 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+// The objective of these tests is to make sure the large job simulations pass 
+// within the memory limits set by the junit tests (1GB)
+// For large jobs please increase memory limits to account for memory used by the 
+// simulation code itself
+public class TestMemoryWithEvents {
+  static Configuration defaultConf;
+  static FileSystem localFs;
+  
+  static {
+    try {
+      defaultConf = new Configuration(false);
+      defaultConf.set("fs.defaultFS", "file:///");
+      defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+      localFs = FileSystem.getLocal(defaultConf);
+      String stagingDir = "target" + Path.SEPARATOR + TestMemoryWithEvents.class.getName() + "-tmpDir";
+      defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
+      Logger.getRootLogger().setLevel(Level.WARN);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  final int numThreads = 30;
+  final int numTasks = 10000;
+
+  private void checkMemory(String name, MockDAGAppMaster mockApp) {
+    long mb = 1024*1024;
+    long microsPerMs = 1000;
+
+    //Getting the runtime reference from system
+    Runtime runtime = Runtime.getRuntime();
+
+    System.out.println("##### Heap utilization statistics [MB] for " + name);
+
+    runtime.gc();
+    
+    //Print used memory
+    System.out.println("##### Used Memory:"
+        + (runtime.totalMemory() - runtime.freeMemory()) / mb);
+
+    //Print free memory
+    System.out.println("##### Free Memory:"
+        + runtime.freeMemory() / mb);
+     
+    //Print total available memory
+    System.out.println("##### Total Memory:" + runtime.totalMemory() / mb);
+
+    //Print Maximum available memory
+    System.out.println("##### Max Memory:" + runtime.maxMemory() / mb);
+    
+    //Print Maximum heartbeat time
+    long numHeartbeats = mockApp.numHearbeats.get();
+    if (numHeartbeats == 0) {
+      numHeartbeats = 1;
+    }
+    System.out.println("##### Heartbeat (ms) :" 
+        + " latency avg: " + ((mockApp.heartbeatTime.get() / numHeartbeats) / microsPerMs) 
+        + " cpu total: " + (mockApp.heartbeatCpu.get() / microsPerMs)
+        + " cpu avg: " + ((mockApp.heartbeatCpu.get() / numHeartbeats) / microsPerMs)
+        + " numHeartbeats: " + mockApp.numHearbeats.get());
+  }
+  
+  private void testMemory(DAG dag, boolean sendDMEvents) throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+        null, false, false, numThreads, 1000);
+    tezClient.start();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.eventsDelegate = new TestMockDAGAppMaster.TestEventsDelegate();
+    mockApp.doSleep = false;
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    mockLauncher.startScheduling(true);
+    DAGStatus status = dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+    checkMemory(dag.getName(), mockApp);
+    
+    tezClient.stop();
+  }
+  
+  public static class SimulationInitializer extends InputInitializer {
+    public SimulationInitializer(InputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      int numTasks = getContext().getNumTasks();
+      List<Event> events = Lists.newArrayListWithCapacity(numTasks);
+      for (int i=0; i<numTasks; ++i) {
+        events.add(InputDataInformationEvent.createWithSerializedPayload(i, null));
+      }
+      return events;
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+    }
+  }
+
+  @Ignore
+  @Test (timeout = 600000)
+  public void testMemoryRootInputEvents() throws Exception {
+    DAG dag = DAG.create("testMemoryRootInputEvents");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+    Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+    vA.addDataSource(
+        "Input",
+        DataSourceDescriptor.create(InputDescriptor.create("In"),
+            InputInitializerDescriptor.create(SimulationInitializer.class.getName()), null));
+    dag.addVertex(vA).addVertex(vB);
+    testMemory(dag, false);
+  }
+  
+  @Ignore
+  @Test (timeout = 600000)
+  public void testMemoryOneToOne() throws Exception {
+    DAG dag = DAG.create("testMemoryOneToOne");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+    Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addEdge(
+            Edge.create(vA, vB, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+    testMemory(dag, true);
+  }
+
+  @Ignore
+  @Test (timeout = 600000)
+  public void testMemoryBroadcast() throws Exception {
+    DAG dag = DAG.create("testMemoryBroadcast");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+    Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addEdge(
+            Edge.create(vA, vB, EdgeProperty.create(DataMovementType.BROADCAST,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+    testMemory(dag, true);
+  }
+  
+  @Ignore
+  @Test (timeout = 600000)
+  public void testMemoryScatterGather() throws Exception {
+    DAG dag = DAG.create("testMemoryScatterGather");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), numTasks);
+    Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), numTasks);
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addEdge(
+            Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+    testMemory(dag, true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 87ffead..1e7faf9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -49,6 +50,9 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -69,6 +73,7 @@ import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
+import org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
 import org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate;
@@ -84,13 +89,17 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.IOStatistics;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -101,8 +110,7 @@ import com.google.common.primitives.Ints;
 public class TestMockDAGAppMaster {
   private static final Log LOG = LogFactory.getLog(TestMockDAGAppMaster.class);
   static Configuration defaultConf;
-  static FileSystem localFs;
-  
+  static FileSystem localFs;  
   static {
     try {
       defaultConf = new Configuration(false);
@@ -116,6 +124,24 @@ public class TestMockDAGAppMaster {
     }
   }
   
+  static class TestEventsDelegate implements EventsDelegate {
+    @Override
+    public void getEvents(TaskSpec taskSpec, List<TezEvent> events) {
+      for (OutputSpec output : taskSpec.getOutputs()) {
+        if (output.getPhysicalEdgeCount() == 1) {
+          events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
+              EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
+                  .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+        } else {
+          events.add(new TezEvent(CompositeDataMovementEvent.create(0,
+              output.getPhysicalEdgeCount(), null), new EventMetaData(
+              EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
+                  .getDestinationVertexName(), taskSpec.getTaskAttemptID())));
+        }
+      }
+    }    
+  }
+  
   @Test (timeout = 5000)
   public void testLocalResourceSetup() throws Exception {
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
@@ -196,7 +222,7 @@ public class TestMockDAGAppMaster {
     MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
     MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
     mockLauncher.startScheduling(false);
-    mockApp.sendDMEvents = true;
+    mockApp.eventsDelegate = new TestEventsDelegate();
     DAG dag = DAG.create("testBasicEvents");
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
     Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
@@ -227,7 +253,8 @@ public class TestMockDAGAppMaster {
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
     VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vB.getName());
     TaskImpl tImpl = (TaskImpl) vImpl.getTask(1);
-    List<TezEvent> tEvents = tImpl.getTaskEvents();
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
+    List<TezEvent> tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
     Assert.assertEquals(2, tEvents.size()); // 2 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
     Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -240,7 +267,8 @@ public class TestMockDAGAppMaster {
         (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
     vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
     tImpl = (TaskImpl) vImpl.getTask(1);
-    tEvents = tImpl.getTaskEvents();
+    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
+    tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
     Assert.assertEquals(2, tEvents.size()); // 2 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
     Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
@@ -253,7 +281,8 @@ public class TestMockDAGAppMaster {
         (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
     vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
     tImpl = (TaskImpl) vImpl.getTask(1);
-    tEvents = tImpl.getTaskEvents();
+    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 1);
+    tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 1000).getEvents();
     Assert.assertEquals(1, tEvents.size()); // 1 from vA
     Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
     Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
@@ -261,6 +290,125 @@ public class TestMockDAGAppMaster {
 
     tezClient.stop();
   }
+  
+  public static class LegacyEdgeTestEdgeManager extends EdgeManagerPlugin {
+    List<Integer> destinationInputIndices = 
+        Collections.unmodifiableList(Collections.singletonList(0));
+    public LegacyEdgeTestEdgeManager(EdgeManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+    }
+
+    @Override
+    public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
+      return 1;
+    }
+
+    @Override
+    public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
+      return 1;
+    }
+
+    @Override
+    public void routeDataMovementEventToDestination(DataMovementEvent event,
+        int sourceTaskIndex, int sourceOutputIndex, 
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+      destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+    }
+
+    @Override
+    public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+        Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+      destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+    }
+
+    @Override
+    public int routeInputErrorEventToSource(InputReadErrorEvent event,
+        int destinationTaskIndex, int destinationFailedInputIndex) {
+      return destinationTaskIndex;
+    }
+    
+    @Override
+    public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+      return 1;
+    }
+  }
+  
+  @Test (timeout = 100000)
+  public void testMixedEdgeRouting() throws Exception {
+   TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+    tezClient.start();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.eventsDelegate = new TestEventsDelegate();
+    DAG dag = DAG.create("testMixedEdgeRouting");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
+    Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 1);
+    Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 1);
+    Vertex vD = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 1);
+    Vertex vE = Vertex.create("E", ProcessorDescriptor.create("Proc.class"), 1);
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addVertex(vC)
+        .addVertex(vD)
+        .addVertex(vE)
+        .addEdge(
+            Edge.create(vA, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+        .addEdge(
+            Edge.create(vB, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+        .addEdge(
+            Edge.create(vA, vD, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+        .addEdge(
+            Edge.create(vB, vD, EdgeProperty.create(
+                EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()),
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+        .addEdge(
+            Edge.create(vB, vE, EdgeProperty.create(
+              EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()),
+              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+              OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    // vC uses on demand routing and its task does not provide events
+    VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
+    Assert.assertEquals(true, vImpl.useOnDemandRouting);
+    TaskImpl tImpl = (TaskImpl) vImpl.getTask(0);
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+    Assert.assertEquals(0, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+    // vD is mixed more and does not use on demand routing and its task provides events
+    vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
+    Assert.assertEquals(false, vImpl.useOnDemandRouting);
+    tImpl = (TaskImpl) vImpl.getTask(0);
+    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+    Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+    // vE has single legacy edge and does not use on demand routing and its task provides events
+    vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
+    Assert.assertEquals(false, vImpl.useOnDemandRouting);
+    tImpl = (TaskImpl) vImpl.getTask(0);
+    taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
+    Assert.assertEquals(2, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
+
+    tezClient.stop();
+  }
 
   @Test (timeout = 10000)
   public void testBasicCounters() throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index f974f40..db8eff1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -42,7 +42,6 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
@@ -62,6 +61,7 @@ import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -186,7 +186,7 @@ public class TestTaskAttemptListenerImplTezDag {
       new TezEvent(new TaskAttemptCompletedEvent(), null)
     );
 
-    EventHandler eventHandler = generateHeartbeat(events);
+    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(2)).handle(arg.capture());
@@ -212,7 +212,7 @@ public class TestTaskAttemptListenerImplTezDag {
     List<TezEvent> events = Arrays.asList(
       new TezEvent(new TaskAttemptCompletedEvent(), null)
     );
-    final EventHandler eventHandler = generateHeartbeat(events);
+    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(1)).handle(arg.capture());
@@ -222,18 +222,29 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
         event.getType());
   }
+  
+  @Test (timeout = 5000)
+  public void testTaskHeartbeatResponse() throws Exception {
+    List<TezEvent> events = new ArrayList<TezEvent>();
+    List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
+    TezHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
+    
+    assertEquals(2, response.getNextFromEventId());
+    assertEquals(1, response.getLastRequestId());
+    assertEquals(eventsToSend, response.getEvents());
+  }
 
-  private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException, TezException {
+  private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events,
+      int fromEventId, int maxEvents, int nextFromEventId,
+      List<TezEvent> sendEvents) throws IOException, TezException {
     ContainerId containerId = createContainerId(appId, 1);
     long requestId = 0;
     Vertex vertex = mock(Vertex.class);
-    Task task = mock(Task.class);
 
     doReturn(vertex).when(dag).getVertex(vertexID);
     doReturn("test_vertex").when(vertex).getName();
-    doReturn(task).when(vertex).getTask(taskID);
-
-    doReturn(new ArrayList<TezEvent>()).when(task).getTaskAttemptTezEvents(taskAttemptID, 0, 1);
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents);
+    doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, maxEvents);
 
     taskAttemptListener.registerRunningContainer(containerId);
     taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);
@@ -243,10 +254,10 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID();
     doReturn(++requestId).when(request).getRequestId();
     doReturn(events).when(request).getEvents();
+    doReturn(maxEvents).when(request).getMaxEvents();
+    doReturn(fromEventId).when(request).getStartIndex();
 
-    taskAttemptListener.heartbeat(request);
-
-    return eventHandler;
+    return taskAttemptListener.heartbeat(request);
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ba40146..d2aa2d0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -51,9 +51,9 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -1028,6 +1028,8 @@ public class TestDAGImpl {
         new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
     dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
     dispatcher.await();
+    v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+    dispatcher.await();
 
     Assert.assertEquals(VertexState.FAILED, v2.getState());
     Assert.assertEquals(VertexState.KILLED, v1.getState());
@@ -1037,7 +1039,40 @@ public class TestDAGImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testEdgeManager_RouteInputSourceTaskFailedEventToDestination() {
+  public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting() {
+    // Remove after legacy routing is removed
+    setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
+    dispatcher.getEventHandler().handle(
+        new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
+    dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(), 
+        null));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
+
+    VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+    v1.useOnDemandRouting = false;
+    v2.useOnDemandRouting = false;
+    dispatcher.await();
+    Task t1= v2.getTask(0);
+    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
+
+    DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0]));
+    TezEvent tezEvent = new TezEvent(daEvent, 
+        new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
+    dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    Assert.assertEquals(VertexState.KILLED, v1.getState());
+    String diag = StringUtils.join(v2.getDiagnostics(), ",");
+    Assert.assertTrue(diag.contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testEdgeManager_RouteInputSourceTaskFailedEventToDestinationLegacyRouting() {
+    // Remove after legacy routing is removed
     setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
     dispatcher.getEventHandler().handle(
         new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
@@ -1048,6 +1083,8 @@ public class TestDAGImpl {
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
+    v1.useOnDemandRouting = false;
+    v2.useOnDemandRouting = false;
     dispatcher.await();
 
     Task t1= v2.getTask(0);
@@ -1057,13 +1094,15 @@ public class TestDAGImpl {
         new EventMetaData(EventProducerConsumerType.INPUT,"vertex1", "vertex2", ta1.getID()));
     dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
     dispatcher.await();
-    // 
+    v2.getTaskAttemptTezEvents(ta1.getID(), 0, 1000);
+    dispatcher.await();
     Assert.assertEquals(VertexState.FAILED, v2.getState());
+    
     Assert.assertEquals(VertexState.KILLED, v1.getState());
     String diag = StringUtils.join(v2.getDiagnostics(), ",");
     Assert.assertTrue(diag.contains(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination.name()));
   }
-  
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testEdgeManager_GetNumDestinationConsumerTasks() {
@@ -1773,7 +1812,7 @@ public class TestDAGImpl {
         TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
   }
 
-  public static class CustomizedEdgeManager extends EdgeManagerPlugin {
+  public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
 
     public static enum ExceptionLocation {
       Initialize,
@@ -1861,6 +1900,47 @@ public class TestDAGImpl {
       }
       return 0;
     }
+
+    @Override
+    public int routeInputErrorEventToSource(int destinationTaskIndex,
+        int destinationFailedInputIndex) throws Exception {
+      if (exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
+        throw new Exception(exLocation.name());
+      }
+      return 0;
+    }
+
+    @Override
+    public EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex,
+        int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+      if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
+        throw new Exception(exLocation.name());
+      }
+      return null;
+    }
+
+    @Override
+    public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+        int sourceTaskIndex, int destinationTaskIndex)
+        throws Exception {
+      if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
+        throw new Exception(exLocation.name());
+      }
+      return null;
+    }
+
+    @Override
+    public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+        int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+      if (exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
+        throw new Exception(exLocation.name());
+      }
+      return null;
+    }
+
+    @Override
+    public void prepareForRouting() throws Exception {
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 99ec6cf..a8eaca1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -73,6 +73,8 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -107,6 +109,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
@@ -148,7 +151,6 @@ import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -186,8 +188,6 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.mockito.internal.util.collections.Sets;
 
@@ -2462,6 +2462,101 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
   }
+  
+  @Test (timeout = 5000)
+  public void testVertexGetTAAttempts() throws Exception {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(v1);
+    VertexImpl v2 = vertices.get("vertex2");
+    startVertex(v2);
+    VertexImpl v3 = vertices.get("vertex3");
+    VertexImpl v4 = vertices.get("vertex4");
+    
+    Assert.assertEquals(VertexState.RUNNING, v4.getState());
+    Assert.assertEquals(1, v4.sourceVertices.size());
+    Edge e = v4.sourceVertices.get(v3);
+    TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v3.getVertexId(), 0), 0);
+    TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(v4.getVertexId(), 0), 0);
+    
+    for (int i=0; i<5; ++i) {
+      v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+          new TezEvent(DataMovementEvent.create(0, null), 
+              new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+    }
+    dispatcher.await();
+    // verify all events have been put in pending.
+    // this is not necessary after legacy routing has been removed
+    Assert.assertEquals(5, v4.pendingTaskEvents.size());
+    v4.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+    // verify all events have been moved to taskEvents
+    Assert.assertEquals(5, v4.getOnDemandRouteEvents().size());
+    for (int i=5; i<11; ++i) {
+      v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+          new TezEvent(DataMovementEvent.create(0, null), 
+              new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
+    }
+    dispatcher.await();
+    // verify all events have been are in taskEvents
+    Assert.assertEquals(11, v4.getOnDemandRouteEvents().size());
+    
+    TaskAttemptEventInfo eventInfo;
+    EdgeManagerPluginOnDemand mockPlugin = mock(EdgeManagerPluginOnDemand.class);
+    EventRouteMetadata mockRoute = EventRouteMetadata.create(1, new int[]{0});
+    e.edgeManager = mockPlugin;
+    // source task id will not match. all events will return null
+    when(mockPlugin.routeDataMovementEventToDestination(1, 0, 0)).thenReturn(mockRoute);
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, 0, 1);
+    Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+    Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
+    
+    int fromEventId = 0;
+    // source task id will match. all events will be returned
+    // max events is respected.
+    when(
+        mockPlugin.routeDataMovementEventToDestination(anyInt(),
+            anyInt(), anyInt())).thenReturn(mockRoute);
+    for (int i=0; i<11; ++i) {
+      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+      fromEventId = eventInfo.getNextFromEventId();
+      Assert.assertEquals((i+1), fromEventId);
+      Assert.assertEquals(1, eventInfo.getEvents().size());
+    }
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 1);
+    Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+    Assert.assertEquals(0, eventInfo.getEvents().size()); // no events
+    
+    // change max events to larger value. max events does not evenly divide total events
+    fromEventId = 0;
+    for (int i=1; i<=2; ++i) {
+      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+      fromEventId = eventInfo.getNextFromEventId();
+      Assert.assertEquals((i*5), fromEventId);
+      Assert.assertEquals(5, eventInfo.getEvents().size());
+    }
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+    Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+    Assert.assertEquals(1, eventInfo.getEvents().size()); // remainder events
+    
+    // return more events that dont evenly fit in max size
+    mockRoute = EventRouteMetadata.create(2, new int[]{0, 0});    
+    when(
+        mockPlugin.routeDataMovementEventToDestination(anyInt(),
+            anyInt(), anyInt())).thenReturn(mockRoute);
+    fromEventId = 0;
+    int lastFromEventId = 0;
+    for (int i=1; i<=4; ++i) {
+      eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+      fromEventId = eventInfo.getNextFromEventId();
+      Assert.assertEquals((i%2 > 0 ? (lastFromEventId+=2) : (lastFromEventId+=3)), fromEventId);     
+      Assert.assertEquals(5, eventInfo.getEvents().size());
+    }
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 5);
+    Assert.assertEquals(11, eventInfo.getNextFromEventId()); // all events traversed
+    Assert.assertEquals(2, eventInfo.getEvents().size()); // remainder events
+  }
 
   @Test(timeout = 5000)
   public void testVertexReconfigurePlannedAfterInit() throws Exception {
@@ -2632,11 +2727,15 @@ public class TestVertexImpl {
   }
   
   @Test(timeout = 5000)
-  public void testVertexPendingTaskEvents() {
+  public void testVertexPendingTaskEventsLegacyRouting() {
+    // Remove after bulk routing API is removed
     initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
     VertexImpl v2 = vertices.get("vertex2");
     VertexImpl v1 = vertices.get("vertex1");
+    v1.useOnDemandRouting = false;
+    v2.useOnDemandRouting = false;
+    v3.useOnDemandRouting = false;
     
     startVertex(v1);
     
@@ -4595,8 +4694,6 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v1.getState());
     Assert.assertEquals(5, v1.getTotalTasks());
-    // task events get buffered
-    Assert.assertEquals(5, v1.pendingTaskEvents.size());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v1
         .getVertexManager().getPlugin().getClass().getName());
     for (int i=0; i < v1Hints.size(); ++i) {
@@ -4609,6 +4706,16 @@ public class TestVertexImpl {
       Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
     }
     
+    // fake scheduling start to trigger edge routing to begin
+    v1.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+    // check all tasks get their events
+    for (int i=0; i<v1.getTotalTasks(); ++i) {
+      Assert.assertEquals(
+          1,
+          v1.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v1.getTask(i).getTaskId(), 0),
+              0, 100).getEvents().size());
+    }
+    
     VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
     
@@ -4627,7 +4734,6 @@ public class TestVertexImpl {
     dispatcher.getEventHandler().handle(
         new VertexEventRouteEvent(v2.getVertexId(), events));
     dispatcher.await();
-    Assert.assertEquals(1, v2.pendingTaskEvents.size());
     
     RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
     List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
@@ -4635,14 +4741,99 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.INITED, v2.getState());
     Assert.assertEquals(10, v2.getTotalTasks());
+    Assert.assertEquals(RootInputVertexManager.class.getName(), v2
+        .getVertexManager().getPlugin().getClass().getName());
+    for (int i=0; i < v2Hints.size(); ++i) {
+      Assert.assertEquals(v2Hints.get(i), v2.getTaskLocationHints()[i]);
+    }
+    Assert.assertEquals(true, initializerManager2.hasShutDown);
+    
+    // fake scheduling start to trigger edge routing to begin
+    v2.scheduleTasks(new LinkedList<VertexManagerPluginContext.TaskWithLocationHint>());
+    // check all tasks get their events
+    for (int i=0; i<v2.getTotalTasks(); ++i) {
+      Assert.assertEquals(
+          ((i==0) ? 2 : 1),
+          v2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(v2.getTask(i).getTaskId(), 0),
+              0, 100).getEvents().size());
+    }
+    for (int i = 0; i < 10; i++) {
+      List<InputSpec> inputSpecs = v1.getInputSpecList(i);
+      Assert.assertEquals(1, inputSpecs.size());
+      Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
+    }
+  }
+  
+  @Test(timeout = 5000)
+  public void testVertexWithInitializerSuccessLegacyRouting() throws Exception {
+    // Remove after legacy routing is removed
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
+    setupPostDagCreation();
+
+    VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager) vertices
+        .get("vertex1");
+    v1.useOnDemandRouting = false;
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+    RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
+    List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
+    initializerManager1.completeInputInitialization(0, 5, v1Hints);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(5, v1.getTotalTasks());
+    Assert.assertEquals(RootInputVertexManager.class.getName(), v1
+        .getVertexManager().getPlugin().getClass().getName());
+    for (int i=0; i < v1Hints.size(); ++i) {
+      Assert.assertEquals(v1Hints.get(i), v1.getTaskLocationHints()[i]);
+    }
+    Assert.assertEquals(true, initializerManager1.hasShutDown);
+    for (int i = 0; i < 5; i++) {
+      List<InputSpec> inputSpecs = v1.getInputSpecList(i);
+      Assert.assertEquals(1, inputSpecs.size());
+      Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
+    }
     // task events get buffered
-    Assert.assertEquals(11, v2.pendingTaskEvents.size());
+    Assert.assertEquals(5, v1.pendingTaskEvents.size());
+    
+    VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager) vertices.get("vertex2");
+    Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    v2.useOnDemandRouting = false;
+
+    // non-task events don't get buffered
+    List<TezEvent> events = Lists.newLinkedList();
+    TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0);
+    TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0);
+    events.add(new TezEvent(
+        VertexManagerEvent.create("vertex2", ByteBuffer.wrap(new byte[0])), new EventMetaData(
+            EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2",
+            ta0_t0_v1)));
+    events.add(new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0,
+        ByteBuffer.wrap(new byte[0])),
+        new EventMetaData(EventProducerConsumerType.INPUT, "vertex2",
+            "NULL_VERTEX", null)));
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v2.getVertexId(), events));
+    dispatcher.await();
+    Assert.assertEquals(1, v2.pendingTaskEvents.size());
+    
+    RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
+    List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
+    initializerManager2.completeInputInitialization(0, 10, v2Hints);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, v2.getState());
+    Assert.assertEquals(10, v2.getTotalTasks());
     Assert.assertEquals(RootInputVertexManager.class.getName(), v2
         .getVertexManager().getPlugin().getClass().getName());
     for (int i=0; i < v2Hints.size(); ++i) {
       Assert.assertEquals(v2Hints.get(i), v2.getTaskLocationHints()[i]);
     }
     Assert.assertEquals(true, initializerManager2.hasShutDown);
+    // task events get buffered
+    Assert.assertEquals(11, v2.pendingTaskEvents.size());
     for (int i = 0; i < 10; i++) {
       List<InputSpec> inputSpecs = v1.getInputSpecList(i);
       Assert.assertEquals(1, inputSpecs.size());
@@ -4650,6 +4841,7 @@ public class TestVertexImpl {
     }
   }
 
+  
   @Test(timeout = 5000)
   public void testVertexWithInputDistributor() throws Exception {
     useCustomInitializer = true;

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
index 09f9a20..9cb914f 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -21,13 +21,13 @@ package org.apache.tez.test;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-public class EdgeManagerForTest extends EdgeManagerPlugin {
+public class EdgeManagerForTest extends EdgeManagerPluginOnDemand {
 
   private UserPayload userPayload;
 
@@ -78,6 +78,35 @@ public class EdgeManagerForTest extends EdgeManagerPlugin {
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) { 
   }
+
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex,
+      int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+    return null;
+  }
+
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex)
+      throws Exception {
+    return null;
+  }
+
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+    return null;
+  }
+
+  @Override
+  public void prepareForRouting() throws Exception {
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex)
+      throws Exception {
+    return 0;
+  }
   
   // End of overridden methods
 

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index f8b8621..921095c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -49,6 +49,7 @@ public abstract class RuntimeTask {
   protected final Configuration tezConf;
   protected final TezUmbilical tezUmbilical;
   protected final AtomicInteger eventCounter;
+  protected final AtomicInteger nextFromEventId;
   private final AtomicBoolean taskDone;
   private final TaskCounterUpdater counterUpdater;
   private final TaskStatistics statistics;
@@ -60,6 +61,7 @@ public abstract class RuntimeTask {
     this.tezUmbilical = tezUmbilical;
     this.tezCounters = new TezCounters();
     this.eventCounter = new AtomicInteger(0);
+    this.nextFromEventId = new AtomicInteger(0);
     this.progress = 0.0f;
     this.taskDone = new AtomicBoolean(false);
     this.statistics = new TaskStatistics();
@@ -130,6 +132,14 @@ public abstract class RuntimeTask {
   public int getEventCounter() {
     return eventCounter.get();
   }
+  
+  public int getNextFromEventId() {
+    return nextFromEventId.get();
+  }
+  
+  public void setNextFromEventId(int nextFromEventId) {
+    this.nextFromEventId.set(nextFromEventId);
+  }
 
   public boolean isTaskDone() {
     return taskDone.get();

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
index 10699ac..cecc706 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
@@ -32,6 +32,7 @@ public class TezHeartbeatResponse implements Writable {
   private long lastRequestId;
   private boolean shouldDie = false;
   private List<TezEvent> events;
+  private int nextFromEventId;
 
   public TezHeartbeatResponse() {
   }
@@ -51,6 +52,10 @@ public class TezHeartbeatResponse implements Writable {
   public long getLastRequestId() {
     return lastRequestId;
   }
+  
+  public int getNextFromEventId() {
+    return nextFromEventId;
+  }
 
   public void setEvents(List<TezEvent> events) {
     this.events = Collections.unmodifiableList(events);
@@ -63,11 +68,16 @@ public class TezHeartbeatResponse implements Writable {
   public void setShouldDie() {
     this.shouldDie = true;
   }
+  
+  public void setNextFromEventId(int nextFromEventId) {
+    this.nextFromEventId = nextFromEventId;
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeLong(lastRequestId);
     out.writeBoolean(shouldDie);
+    out.writeInt(nextFromEventId);
     if(events != null) {
       out.writeBoolean(true);
       out.writeInt(events.size());
@@ -83,6 +93,7 @@ public class TezHeartbeatResponse implements Writable {
   public void readFields(DataInput in) throws IOException {
     lastRequestId = in.readLong();
     shouldDie = in.readBoolean();
+    nextFromEventId = in.readInt();
     if(in.readBoolean()) {
       int eventCount = in.readInt();
       events = new ArrayList<TezEvent>(eventCount);
@@ -99,6 +110,7 @@ public class TezHeartbeatResponse implements Writable {
     return "{ "
         + " lastRequestId=" + lastRequestId
         + ", shouldDie=" + shouldDie
+        + ", nextFromEventId=" + nextFromEventId
         + ", eventCount=" + (events != null ? events.size() : 0)
         + " }";
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 7324abd..3d1d1a2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -240,8 +240,9 @@ public class TaskReporter {
       }
 
       long requestId = requestCounter.incrementAndGet();
+      int fromEventId = task.getNextFromEventId();
       TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
-          task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
+          task.getTaskAttemptID(), fromEventId, maxEventsToGet);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending heartbeat to AM, request=" + request);
       }
@@ -271,11 +272,12 @@ public class TaskReporter {
               + " heartbeat response, eventCount=" + response.getEvents().size());
         }
       } else {
+        task.setNextFromEventId(response.getNextFromEventId());
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
-                + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
-          }
+          LOG.info("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+              + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()
+              + " fromEventId=" + fromEventId
+              + " nextFromEventId=" + response.getNextFromEventId());
           // This should ideally happen in a separate thread
           numEventsReceived = response.getEvents().size();
           task.handleEvents(response.getEvents());