You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/05/08 00:44:40 UTC

[3/3] tez git commit: TEZ-776. Reduce AM mem usage caused by storing TezEvents (bikas)

TEZ-776. Reduce AM mem usage caused by storing TezEvents (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/05f77fe2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/05f77fe2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/05f77fe2

Branch: refs/heads/master
Commit: 05f77fe2b210341a16ead9fc51e53093c836d860
Parents: a382324
Author: Bikas Saha <bi...@apache.org>
Authored: Thu May 7 15:44:38 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu May 7 15:44:38 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 tez-api/findbugs-exclude.xml                    |  12 +
 .../apache/tez/dag/api/EdgeManagerPlugin.java   |   4 +-
 .../tez/dag/api/EdgeManagerPluginOnDemand.java  | 332 +++++++++++++++++++
 .../api/events/CompositeDataMovementEvent.java  |  21 ++
 .../runtime/api/events/DataMovementEvent.java   |  33 +-
 .../runtime/api/events/InputFailedEvent.java    |  16 +
 tez-dag/findbugs-exclude.xml                    |   5 +-
 .../tez/dag/app/TaskAttemptEventInfo.java       |  40 +++
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   6 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   5 +
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |  41 ++-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 244 ++++++++++++--
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  67 +++-
 .../app/dag/impl/ScatterGatherEdgeManager.java  |  83 ++++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   6 -
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 323 ++++++++++++++----
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  25 +-
 .../tez/dag/app/TestMemoryWithEvents.java       | 219 ++++++++++++
 .../tez/dag/app/TestMockDAGAppMaster.java       | 160 ++++++++-
 .../app/TestTaskAttemptListenerImplTezDag.java  |  33 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  90 ++++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 208 +++++++++++-
 .../org/apache/tez/test/EdgeManagerForTest.java |  33 +-
 .../org/apache/tez/runtime/RuntimeTask.java     |  10 +
 .../runtime/api/impl/TezHeartbeatResponse.java  |  12 +
 .../apache/tez/runtime/task/TaskReporter.java   |  12 +-
 .../vertexmanager/ShuffleVertexManager.java     | 117 ++++++-
 .../tez/test/TestExceptionPropagation.java      |  39 ++-
 29 files changed, 2000 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8de61b0..ba8e9d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
     Default max limit increased. Should not affect existing users.
 
 ALL CHANGES:
+  TEZ-776. Reduce AM mem usage caused by storing TezEvents
   TEZ-2423. Tez UI: Remove Attempt Index column from task->attempts page
   TEZ-2416. Tez UI: Make tooltips display faster.
   TEZ-2404. Handle DataMovementEvent before its TaskAttemptCompletedEvent

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml
index b928a44..07792e6 100644
--- a/tez-api/findbugs-exclude.xml
+++ b/tez-api/findbugs-exclude.xml
@@ -85,4 +85,16 @@
     <Bug pattern="SWL_SLEEP_WITH_LOCK_HELD"/>
   </Match>
 
+  <Match>
+    <Class name="org.apache.tez.dag.api.EdgeManagerPluginOnDemand$EventRouteMetadata"/>
+    <Method name="getSourceIndices"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.api.EdgeManagerPluginOnDemand$EventRouteMetadata"/>
+    <Method name="getTargetIndices"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
index 8768e7d..4e22f63 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
@@ -101,7 +101,7 @@ public abstract class EdgeManagerPlugin {
   public abstract void routeDataMovementEventToDestination(DataMovementEvent event,
       int sourceTaskIndex, int sourceOutputIndex,
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception;
-  
+
   /**
    * Return the routing information to inform consumers about the failure of a
    * source task whose outputs have been potentially lost. The return map has
@@ -144,7 +144,7 @@ public abstract class EdgeManagerPlugin {
       int destinationTaskIndex, int destinationFailedInputIndex) throws Exception;
 
   /**
-   * Return ahe {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
+   * Return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
    * the vertex manager.
    *
    * @return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for the input

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
new file mode 100644
index 0000000..05c0c62
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
@@ -0,0 +1,332 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+/**
+ * This interface defines the routing of the event between tasks of producer and 
+ * consumer vertices. The routing is bi-directional. Users can customize the 
+ * routing by providing an implementation of this interface.
+ */
+@Public
+@Unstable
+public abstract class EdgeManagerPluginOnDemand extends EdgeManagerPlugin {
+
+  /**
+   * Class to provide routing metadata for {@link Event}s to be routed between
+   * producer and consumer tasks. The routing data enabled the system to send 
+   * the event from the producer task output to the consumer task input
+   */
+  public static class EventRouteMetadata {
+    private final int numEvents;
+    private final int[] targetIndices;
+    private final int[] sourceIndices;
+    
+    /**
+     * Create an {@link EventRouteMetadata} that will create numEvents copies of
+     * the {@link Event} to be routed. Use this to create
+     * {@link EventRouteMetadata} for {@link DataMovementEvent}s or
+     * {@link InputFailedEvent}s where the target input indices must be
+     * specified to route those events. Typically numEvents would be 1 for these
+     * events.
+     * 
+     * @param numEvents
+     *          Number of copies of the event to be routed
+     * @param targetIndices
+     *          Target input indices. The array length must match the number of
+     *          events specified when creating the {@link EventRouteMetadata}
+     *          object
+     * @return {@link EventRouteMetadata}
+     */
+    public static EventRouteMetadata create(int numEvents, int[] targetIndices) {
+      return new EventRouteMetadata(numEvents, targetIndices, null);
+    }
+    
+    /**
+     * Create an {@link EventRouteMetadata} that will create numEvents copies of
+     * the {@link Event} to be routed. Use this to create
+     * {@link EventRouteMetadata} for {@link CompositeDataMovementEvent} where
+     * the target input indices and source output indices must be specified to
+     * route those events. Typically numEvents would be 1 for these events.
+     * 
+     * @param numEvents
+     *          Number of copies of the event to be routed
+     * @param targetIndices
+     *          Target input indices. The array length must match the number of
+     *          events specified when creating the {@link EventRouteMetadata}
+     *          object
+     * @param sourceIndices
+     *          Source output indices. The array length must match the number of
+     *          events specified when creating the {@link EventRouteMetadata}
+     *          object
+     * @return {@link EventRouteMetadata}
+     */
+    public static EventRouteMetadata create(int numEvents, int[] targetIndices, int[] sourceIndices) {
+      return new EventRouteMetadata(numEvents, targetIndices, sourceIndices);
+    }
+
+    private EventRouteMetadata(int numEvents, int[] targetIndices, int[] sourceIndices) {
+      this.numEvents = numEvents;
+      this.targetIndices = targetIndices;
+      this.sourceIndices = sourceIndices;
+    }
+
+    /**
+     * Get the number of copies of the event to be routed
+     * @return Number of copies
+     */
+    public int getNumEvents() {
+      return numEvents;
+    }
+
+    /**
+     * Get the target input indices
+     * @return Target input indices
+     */
+    public @Nullable int[] getTargetIndices() {
+      return targetIndices;
+    }
+
+    /**
+     * Get the source output indices
+     * @return Source output indices
+     */
+    public @Nullable int[] getSourceIndices() {
+      return sourceIndices;
+    }
+  }
+
+  /**
+   * Create an instance of the {@link EdgeManagerPluginOnDemand}. Classes
+   * extending this to create a {@link EdgeManagerPluginOnDemand}, must provide
+   * the same constructor so that Tez can create an instance of the class at
+   * runtime.
+   * 
+   * @param context
+   *          the context within which this {@link EdgeManagerPluginOnDemand}
+   *          will run. Includes information like configuration which the user
+   *          may have specified while setting up the edge.
+   */
+  public EdgeManagerPluginOnDemand(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  /**
+   * Initializes the EdgeManagerPlugin. This method is called in the following
+   * circumstances </p> 1. when initializing an EdgeManagerPlugin for the first time.
+   * </p> 2. When an EdgeManagerPlugin is replaced at runtime. At this point, an
+   * EdgeManagerPlugin instance is created and setup by the user. The initialize
+   * method will be called with the original {@link EdgeManagerPluginContext} when the
+   * EdgeManagerPlugin is replaced.
+   * @throws Exception
+   */
+  public abstract void initialize() throws Exception;
+  
+  /**
+   * This method will be invoked just before routing of events will begin. The
+   * plugin can use this opportunity to make any runtime initialization's that
+   * depend on the actual state of the DAG or vertices.
+   */
+  public abstract void prepareForRouting() throws Exception;
+
+  /**
+   * Get the number of physical inputs on the destination task
+   * @param destinationTaskIndex Index of destination task for which number of 
+   * inputs is needed
+   * @return Number of physical inputs on the destination task
+   * @throws Exception
+   */
+  public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception;
+
+  /**
+   * Get the number of physical outputs on the source task
+   * @param sourceTaskIndex Index of the source task for which number of outputs 
+   * is needed
+   * @return Number of physical outputs on the source task
+   * @throws Exception
+   */
+  public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception;
+  
+  /**
+   * Get the number of destination tasks that consume data from the source task
+   * @param sourceTaskIndex Source task index
+   * @throws Exception
+   */
+  public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception;
+  
+  /**
+   * Return the source task index to which to send the input error event
+   * 
+   * @param destinationTaskIndex
+   *          Destination task that reported the error
+   * @param destinationFailedInputIndex
+   *          Index of the physical input on the destination task that reported 
+   *          the error
+   * @return Index of the source task that created the unavailable input
+   * @throws Exception
+   */
+  public abstract int routeInputErrorEventToSource(int destinationTaskIndex,
+      int destinationFailedInputIndex) throws Exception;
+  
+  /**
+   * The method provides the {@link EventRouteMetadata} to route a
+   * {@link DataMovementEvent} produced by the given source task to the given
+   * destination task. The returned {@link EventRouteMetadata} should have the
+   * target input indices set to enable the routing. If the routing metadata is
+   * common across different events then the plugin can cache and reuse the same
+   * object.
+   * 
+   * @param sourceTaskIndex
+   *          The index of the task in the source vertex of this edge that
+   *          produced a {@link DataMovementEvent}
+   * @param sourceOutputIndex
+   *          Index of the physical output on the source task that produced the
+   *          event
+   * @param destinationTaskIndex
+   * @return {@link EventRouteMetadata} with target indices set. Maybe null if
+   *         the given destination task does not read input from the given
+   *         source task.
+   * @throws Exception
+   */
+  public abstract @Nullable EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex,
+      int sourceOutputIndex, int destinationTaskIndex) throws Exception;
+
+  /**
+   * The method provides the {@link EventRouteMetadata} to route a
+   * {@link CompositeDataMovementEvent} produced by the given source task to the
+   * given destination task. The returned {@link EventRouteMetadata} should have
+   * the target input indices and source output indices set to enable the
+   * routing. If the routing metadata is common across different events then the
+   * plugin can cache and reuse the same object.
+   * 
+   * @param sourceTaskIndex
+   *          The index of the task in the source vertex of this edge that
+   *          produced a {@link CompositeDataMovementEvent}
+   * @param destinationTaskIndex
+   *          The index of the task in the destination vertex of this edge
+   * @return {@link EventRouteMetadata} with source and target indices set. This
+   *         may be null if the destination task does not read data from the
+   *         source task.
+   * @throws Exception
+   */
+  public abstract @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex) throws Exception;
+
+  /**
+   * The method provides the {@link EventRouteMetadata} to route an
+   * {@link InputFailedEvent} produced by the given source task to the given
+   * destination task. The returned {@link EventRouteMetadata} should have the
+   * target input indices set to enable the routing. If the routing metadata is
+   * common across different events then the plugin can cache and reuse the same
+   * object.
+   * 
+   * @param sourceTaskIndex
+   *          The index of the failed task in the source vertex of this edge.
+   * @param destinationTaskIndex
+   *          The index of a task in the destination vertex of this edge.
+   * @return {@link EventRouteMetadata} with target indices set. Maybe null if
+   *         the given destination task does not read input from the given
+   *         source task.
+   * @throws Exception
+   */
+  public abstract @Nullable EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex) throws Exception;
+  
+  /**
+   * Return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
+   * the vertex manager.
+   *
+   * @return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for the input
+   */
+  public EdgeManagerPluginContext getContext() {
+    return super.getContext();
+  }
+
+  // Empty implementations of EdgeManagerPlugin interfaces that are not needed
+  /**
+   * Return the routing information to inform consumers about the source task
+   * output that is now available. The return map has the routing information.
+   * The event will be routed to every destination task index in the key of the
+   * map. Every physical input in the value for that task key will receive the
+   * input.
+   * 
+   * @param event
+   *          Data movement event that contains the output information
+   * @param sourceTaskIndex
+   *          Source task that produced the event
+   * @param sourceOutputIndex
+   *          Index of the physical output on the source task that produced the
+   *          event
+   * @param destinationTaskAndInputIndices
+   *          Map via which the routing information is returned
+   * @throws Exception
+   */
+  public void routeDataMovementEventToDestination(DataMovementEvent event,
+      int sourceTaskIndex, int sourceOutputIndex,
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {}
+
+  /**
+   * Return the routing information to inform consumers about the failure of a
+   * source task whose outputs have been potentially lost. The return map has
+   * the routing information. The failure notification event will be sent to
+   * every task index in the key of the map. Every physical input in the value
+   * for that task key will receive the failure notification. This method will
+   * be called once for every source task failure and information for all
+   * affected destinations must be provided in that invocation.
+   * 
+   * @param sourceTaskIndex
+   *          Source task
+   * @param destinationTaskAndInputIndices
+   *          Map via which the routing information is returned
+   * @throws Exception
+   */
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {}
+
+  /**
+   * Return the source task index to which to send the input error event
+   * 
+   * @param event
+   *          Input read error event. Has more information about the error
+   * @param destinationTaskIndex
+   *          Destination task that reported the error
+   * @param destinationFailedInputIndex
+   *          Index of the physical input on the destination task that reported 
+   *          the error
+   * @return Index of the source task that created the unavailable input
+   * @throws Exception
+   */
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex, int destinationFailedInputIndex) throws Exception { 
+    return routeInputErrorEventToSource(destinationTaskIndex, destinationFailedInputIndex);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
index b38fda3..c45d272 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.api.events;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.tez.runtime.api.Event;
 
@@ -65,6 +66,26 @@ public class CompositeDataMovementEvent extends Event {
                                                   ByteBuffer userPayload) {
     return new CompositeDataMovementEvent(srcIndexStart, count, userPayload);
   }
+  
+  /**
+   * Expand the {@link CompositeDataMovementEvent} into a routable
+   * {@link DataMovementEvent} by providing the source output index and the
+   * target input index.
+   * 
+   * @param sourceIndex
+   *          The index of the physical output represented by the
+   *          {@link DataMovementEvent}
+   * @param targetIndex
+   *          The index of the physical input to which the given
+   *          {@link DataMovementEvent} should be routed.
+   * @return {@link DataMovementEvent} created from the
+   *         {@link CompositeDataMovementEvent} with indices specified by the
+   *         method parameters
+   */
+  @Private
+  public DataMovementEvent expand(int sourceIndex, int targetIndex) {
+    return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload);
+  }
 
   public int getSourceIndexStart() {
     return sourceIndexStart;

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
index b9c1cc4..05c3d3f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Output;
 
 /**
  * Event used by user code to send information between tasks. An output can
@@ -56,14 +57,8 @@ public final class DataMovementEvent extends Event {
   private int version;
 
 
-  private DataMovementEvent(int sourceIndex,
-                            ByteBuffer userPayload) {
-    this.userPayload = userPayload;
-    this.sourceIndex = sourceIndex;
-  }
-
   @Private
-  private DataMovementEvent(int sourceIndex,
+  DataMovementEvent(int sourceIndex,
                             int targetIndex,
                             int version,
                             ByteBuffer userPayload) {
@@ -74,20 +69,21 @@ public final class DataMovementEvent extends Event {
   }
 
   private DataMovementEvent(ByteBuffer userPayload) {
-    this(-1, userPayload);
+    this(-1, -1, -1, userPayload);
   }
 
   /**
-   * User Event constructor
+   * User Event constructor for {@link Output}s
    * @param sourceIndex Index to identify the physical edge of the input/output
    * that generated the event
    * @param userPayload User Payload of the User Event
    */
   public static DataMovementEvent create(int sourceIndex,
                                          ByteBuffer userPayload) {
-    return new DataMovementEvent(sourceIndex, userPayload);
+    return new DataMovementEvent(sourceIndex, -1, -1, userPayload);
   }
-
+  
+  @Private
   /**
    * Constructor for Processor-generated User Events
    * @param userPayload
@@ -103,6 +99,21 @@ public final class DataMovementEvent extends Event {
                                          ByteBuffer userPayload) {
     return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload);
   }
+  
+  /**
+   * Make a routable copy of the {@link DataMovementEvent} by adding a target
+   * input index
+   * 
+   * @param targetIndex
+   *          The index of the physical input to which this
+   *          {@link DataMovementEvent} should be routed
+   * @return Copy of this {@link DataMovementEvent} with the target input index
+   *         added to it
+   */
+  @Private
+  public DataMovementEvent makeCopy(int targetIndex) {
+    return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload);
+  }
 
   public ByteBuffer getUserPayload() {
     return userPayload == null ? null : userPayload.asReadOnlyBuffer();

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
index 639d0b9..9d8363a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
@@ -55,6 +55,22 @@ public class InputFailedEvent extends Event{
   public static InputFailedEvent create(int targetIndex, int version) {
     return new InputFailedEvent(targetIndex, version);
   }
+  
+  /**
+   * Create a copy of the {@link InputFailedEvent} by adding a target input
+   * index The index of the physical input to which this event should be routed
+   * 
+   * @param targetIndex
+   *          The index of the physical input to which this
+   *          {@link InputFailedEvent} should be routed
+   * 
+   * @return copy of the {@link InputFailedEvent} with the target input index
+   *         added
+   */
+  @Private
+  public InputFailedEvent makeCopy(int targetIndex) {
+    return create(targetIndex, version);
+  }
 
   public int getTargetIndex() {
     return targetIndex;

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 233f76c..57c0aca 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -118,7 +118,10 @@
   <!-- TEZ-1952 -->
   <Match>
     <Class name="org.apache.tez.dag.app.dag.impl.Edge"/>
-    <Field name="edgeProperty"/>
+    <Or>
+      <Field name="edgeProperty"/>
+      <Field name="onDemandRouting"/>
+    </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC"/>
   </Match>
 

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
new file mode 100644
index 0000000..49ff044
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptEventInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.util.List;
+
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class TaskAttemptEventInfo {
+  private final int nextFromEventId;
+  private final List<TezEvent> events;
+  
+  public TaskAttemptEventInfo(int nextFromEventId, List<TezEvent> events) {
+    this.nextFromEventId = nextFromEventId;
+    this.events = events;
+  }
+  
+  public int getNextFromEventId() {
+    return nextFromEventId;
+  }
+  
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index b38081b..970489d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -444,13 +444,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
               new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
         }
         taskHeartbeatHandler.pinged(taskAttemptID);
-        List<TezEvent> outEvents = context
+        TaskAttemptEventInfo eventInfo = context
             .getCurrentDAG()
             .getVertex(taskAttemptID.getTaskID().getVertexID())
-            .getTask(taskAttemptID.getTaskID())
             .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
                 request.getMaxEvents());
-        response.setEvents(outEvents);
+        response.setEvents(eventInfo.getEvents());
+        response.setNextFromEventId(eventInfo.getNextFromEventId());
       }
       containerInfo.lastRequestId = requestId;
       containerInfo.lastReponse = response;

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 77ef6e0..bb42392 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -45,10 +45,12 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
@@ -144,6 +146,9 @@ public interface Vertex extends Comparable<Vertex> {
   void scheduleSpeculativeTask(TezTaskID taskId);
   Resource getTaskResource();
   
+  public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
+      int fromEventId, int maxEvents);
+  
   void handleSpeculatorEvent(SpeculatorEvent event);
 
   ProcessorDescriptor getProcessorDescriptor();

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index db57227..d14527d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -22,12 +22,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-public class BroadcastEdgeManager extends EdgeManagerPlugin {
+public class BroadcastEdgeManager extends EdgeManagerPluginOnDemand {
+
+  EventRouteMetadata[] commonRouteMeta;
 
   public BroadcastEdgeManager(EdgeManagerPluginContext context) {
     super(context);
@@ -60,6 +62,35 @@ public class BroadcastEdgeManager extends EdgeManagerPlugin {
   }
   
   @Override
+  public void prepareForRouting() throws Exception {
+    int numSourceTasks = getContext().getSourceVertexNumTasks();
+    commonRouteMeta = new EventRouteMetadata[numSourceTasks];
+    for (int i=0; i<numSourceTasks; ++i) {
+      commonRouteMeta[i] = EventRouteMetadata.create(1, new int[]{i}, new int[]{0});
+    }
+  }
+  
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(
+      int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
+      throws Exception {
+    return commonRouteMeta[sourceTaskIndex];
+  }
+  
+  @Override
+  public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex)
+      throws Exception {
+    return commonRouteMeta[sourceTaskIndex];
+  }
+
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+    return commonRouteMeta[sourceTaskIndex];
+  }
+
+  @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
     List<Integer> inputIndices = 
@@ -71,6 +102,12 @@ public class BroadcastEdgeManager extends EdgeManagerPlugin {
   }
 
   @Override
+  public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex)
+      throws Exception {
+    return destinationFailedInputIndex;
+  }
+
+  @Override
   public int routeInputErrorEventToSource(InputReadErrorEvent event,
       int destinationTaskIndex, int destinationFailedInputIndex) {
     return destinationFailedInputIndex;

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index f5fef67..78bab05 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
@@ -30,9 +31,11 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
@@ -97,7 +100,9 @@ public class Edge {
 
   private EdgeProperty edgeProperty;
   private EdgeManagerPluginContext edgeManagerContext;
-  private EdgeManagerPlugin edgeManager;
+  @VisibleForTesting
+  EdgeManagerPlugin edgeManager;
+  private boolean onDemandRouting = false;
   @SuppressWarnings("rawtypes")
   private EventHandler eventHandler;
   private AtomicBoolean bufferEvents = new AtomicBoolean(false);
@@ -106,6 +111,9 @@ public class Edge {
   private Vertex sourceVertex;
   private Vertex destinationVertex; // this may end up being a list for shared edge
   private EventMetaData destinationMetaInfo;
+  private boolean routingNeeded = true;
+  private final ConcurrentMap<TezTaskAttemptID, PendingEventRouteMetadata> pendingEvents = Maps
+      .newConcurrentMap();
 
   @SuppressWarnings("rawtypes")
   public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
@@ -188,6 +196,29 @@ public class Edge {
             edgeProperty.getEdgeDestination());
     setEdgeProperty(modifiedEdgeProperty);
   }
+  
+  public synchronized boolean routingToBegin() throws AMUserCodeException {
+    if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
+      routingNeeded = false;
+    } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
+      throw new TezUncheckedException(
+          "Internal error. Not expected to route events to a destination until parallelism is determined" +
+          " sourceVertex=" + sourceVertex.getLogIdentifier() +
+          " edgeManager=" + edgeManager.getClass().getName());
+    }
+    if (edgeManager instanceof EdgeManagerPluginOnDemand) {
+      onDemandRouting = true;
+      try {
+        ((EdgeManagerPluginOnDemand)edgeManager).prepareForRouting();
+      } catch (Exception e) {
+        throw new AMUserCodeException(Source.EdgeManager,
+            "Fail to prepareForRouting " + getEdgeInfo(), e);
+      }
+    }
+    
+    LOG.info("Routing to begin for edge: " + getEdgeInfo() + ". EdgeProperty: " + edgeProperty);
+    return onDemandRouting;
+  }
 
   public synchronized EdgeProperty getEdgeProperty() {
     return this.edgeProperty;
@@ -280,8 +311,13 @@ public class Edge {
         int srcTaskIndex;
         int numConsumers;
         try {
-          srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
-              destTaskIndex, event.getIndex());
+          if (onDemandRouting) {
+            srcTaskIndex = ((EdgeManagerPluginOnDemand) edgeManager).routeInputErrorEventToSource(
+                destTaskIndex, event.getIndex());            
+          } else {
+            srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
+                destTaskIndex, event.getIndex());
+          }
           Preconditions.checkArgument(srcTaskIndex >= 0,
               "SourceTaskIndex should not be negative,"
               + "srcTaskIndex=" + srcTaskIndex);
@@ -340,7 +376,6 @@ public class Edge {
     Preconditions.checkState(edgeManager != null, 
         "Edge Manager must be initialized by this time");
     Event event = tezEvent.getEvent();
-    boolean isFirstEvent = true;
     // cache of event object per input index
     Map<Integer, TezEvent> inputIndicesWithEvents = Maps.newHashMap(); 
     for (Map.Entry<Integer, List<Integer>> entry : taskAndInputIndices.entrySet()) {
@@ -350,28 +385,16 @@ public class Edge {
         Integer inputIndex = inputIndices.get(i);
         TezEvent tezEventToSend = inputIndicesWithEvents.get(inputIndex);
         if (tezEventToSend == null) {
-          if (isFirstEvent) {
-            isFirstEvent = false;
-            // this is the first item - reuse the event object
-            if (isDataMovementEvent) {
-              ((DataMovementEvent) event).setTargetIndex(inputIndex);
-            } else {
-              ((InputFailedEvent) event).setTargetIndex(inputIndex);
-            }
-            tezEventToSend = tezEvent;
+          Event e;
+          if (isDataMovementEvent) {
+            DataMovementEvent dmEvent = (DataMovementEvent) event;
+            e = DataMovementEvent.create(dmEvent.getSourceIndex(),
+                inputIndex, dmEvent.getVersion(), dmEvent.getUserPayload());
           } else {
-            // create new event object for this input index
-            Event e;
-            if (isDataMovementEvent) {
-              DataMovementEvent dmEvent = (DataMovementEvent) event;
-              e = DataMovementEvent.create(dmEvent.getSourceIndex(),
-                  inputIndex, dmEvent.getVersion(), dmEvent.getUserPayload());
-            } else {
-              InputFailedEvent ifEvent = ((InputFailedEvent) event);
-              e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
-            }
-            tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+            InputFailedEvent ifEvent = ((InputFailedEvent) event);
+            e = InputFailedEvent.create(inputIndex, ifEvent.getVersion());
           }
+          tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
           tezEventToSend.setDestinationInfo(destinationMetaInfo);
           // cache the event object per input because are unique per input index
           inputIndicesWithEvents.put(inputIndex, tezEventToSend);
@@ -392,8 +415,6 @@ public class Edge {
   }
   
   public void sendTezEventToDestinationTasks(TezEvent tezEvent) throws AMUserCodeException {
-    Preconditions.checkState(edgeManager != null, 
-        "Edge Manager must be initialized by this time");
     if (!bufferEvents.get()) {
       boolean isDataMovementEvent = true;
       switch (tezEvent.getEventType()) {
@@ -411,16 +432,7 @@ public class Edge {
             .getTaskAttemptID();
         int srcTaskIndex = srcAttemptId.getTaskID().getId();
 
-        boolean routingRequired = true;
-        if (edgeManagerContext.getDestinationVertexNumTasks() == 0) {
-          routingRequired = false;
-          LOG.info("Not routing events since destination vertex has 0 tasks" +
-              generateCommonDebugString(srcTaskIndex, tezEvent));
-        } else if (edgeManagerContext.getDestinationVertexNumTasks() < 0) {
-          throw new TezUncheckedException(
-              "Internal error. Not expected to route events to a destination until parallelism is determined" +
-                  generateCommonDebugString(srcTaskIndex, tezEvent));
-        }
+        boolean routingRequired = routingNeeded;
 
         if (routingRequired) {
           try {
@@ -439,6 +451,9 @@ public class Edge {
                 + ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:"
                 + tezEvent.getDestinationInfo() + ", " + getEdgeInfo(), e);
           }
+        } else {
+          LOG.info("Not routing events since destination vertex has 0 tasks" +
+              generateCommonDebugString(srcTaskIndex, tezEvent));
         }
 
         if (!destTaskAndInputIndices.isEmpty()) {
@@ -459,6 +474,163 @@ public class Edge {
     }
   }
   
+  static class PendingEventRouteMetadata {
+    private final EventRouteMetadata routeMeta;
+    private final TezEvent event;
+    private int numEventsRouted;
+    
+    public PendingEventRouteMetadata(EventRouteMetadata routeMeta, TezEvent event,
+        int numEventsRouted) {
+      this.routeMeta = routeMeta;
+      this.event = event;
+      this.numEventsRouted = numEventsRouted;
+    }
+    
+    public EventRouteMetadata getRouteMeta() {
+      return routeMeta;
+    }
+    
+    public TezEvent getTezEvent() {
+      return event;
+    }
+    
+    public int getNumEventsRouted() {
+      return numEventsRouted;
+    }
+  }
+
+  public PendingEventRouteMetadata removePendingEvents(TezTaskAttemptID attemptID) {
+    return pendingEvents.remove(attemptID);
+  }
+  
+  // return false is event could be routed but ran out of space in the list
+  public boolean maybeAddTezEventForDestinationTask(TezEvent tezEvent, TezTaskAttemptID attemptID,
+      int srcTaskIndex, List<TezEvent> listToAdd, int listMaxSize, 
+      PendingEventRouteMetadata pendingRoutes) 
+          throws AMUserCodeException {
+    if (!routingNeeded) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not routing events since destination vertex has 0 tasks" +
+            generateCommonDebugString(srcTaskIndex, tezEvent));
+      }
+      return true;
+    } else {
+      try {
+        EdgeManagerPluginOnDemand edgeManagerOnDemand = (EdgeManagerPluginOnDemand) edgeManager;
+        int taskIndex = attemptID.getTaskID().getId();
+        switch (tezEvent.getEventType()) {
+        case COMPOSITE_DATA_MOVEMENT_EVENT:
+          {
+            CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent();        
+            EventRouteMetadata routeMeta;
+            int numEventsDone;
+            if (pendingRoutes != null) {
+              routeMeta = pendingRoutes.getRouteMeta();
+              numEventsDone = pendingRoutes.getNumEventsRouted();
+            } else {
+              routeMeta = edgeManagerOnDemand
+                  .routeCompositeDataMovementEventToDestination(srcTaskIndex, taskIndex);
+              numEventsDone = 0;
+            }
+            if (routeMeta != null) {
+              int listSize = listToAdd.size();
+              int numEvents = routeMeta.getNumEvents();
+              int[] sourceIndices = routeMeta.getSourceIndices();
+              int[] targetIndices = routeMeta.getTargetIndices();
+              while (numEventsDone < numEvents && listSize++ < listMaxSize) {
+                DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone],
+                    targetIndices[numEventsDone]);
+                numEventsDone++;
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                tezEventToSend.setDestinationInfo(destinationMetaInfo);
+                listToAdd.add(tezEventToSend);
+              }
+              if (numEventsDone < numEvents) {
+                pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
+                    numEventsDone));
+                return false;
+              }
+            }
+          }
+          break;
+        case INPUT_FAILED_EVENT:
+          {
+            InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
+            EventRouteMetadata routeMeta;
+            int numEventsDone;
+            if (pendingRoutes != null) {
+              routeMeta = pendingRoutes.getRouteMeta();
+              numEventsDone = pendingRoutes.getNumEventsRouted();
+            } else {
+              routeMeta = edgeManagerOnDemand.routeInputSourceTaskFailedEventToDestination(
+                  srcTaskIndex, taskIndex);
+              numEventsDone = 0;
+            }
+            if (routeMeta != null) {
+              int listSize = listToAdd.size();
+              int numEvents = routeMeta.getNumEvents();
+              int[] targetIndices = routeMeta.getTargetIndices();
+              while (numEventsDone < numEvents && listSize++ < listMaxSize) {
+                InputFailedEvent e = ifEvent.makeCopy(targetIndices[numEventsDone]);
+                numEventsDone++;
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                tezEventToSend.setDestinationInfo(destinationMetaInfo);
+                listToAdd.add(tezEventToSend);
+              }
+              if (numEventsDone < numEvents) {
+                pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
+                    numEventsDone));
+                return false;
+              }
+            }
+          }
+          break;
+        case DATA_MOVEMENT_EVENT:
+          {
+            DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
+            EventRouteMetadata routeMeta;
+            int numEventsDone;
+            if (pendingRoutes != null) {
+              routeMeta = pendingRoutes.getRouteMeta();
+              numEventsDone = pendingRoutes.getNumEventsRouted();
+            } else {
+              routeMeta = edgeManagerOnDemand.routeDataMovementEventToDestination(srcTaskIndex,
+                  dmEvent.getSourceIndex(), taskIndex);
+              numEventsDone = 0;
+            }
+            if (routeMeta != null) {
+              int listSize = listToAdd.size();
+              int numEvents = routeMeta.getNumEvents();
+              int[] targetIndices = routeMeta.getTargetIndices();
+              while (numEventsDone < numEvents && listSize++ < listMaxSize) {
+                DataMovementEvent e = dmEvent.makeCopy(targetIndices[numEventsDone]);
+                numEventsDone++;
+                TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+                tezEventToSend.setDestinationInfo(destinationMetaInfo);
+                listToAdd.add(tezEventToSend);
+              }
+              if (numEventsDone < numEvents) {
+                pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent,
+                    numEventsDone));
+                return false;
+              }
+            }
+          }
+          break;
+        default:
+          throw new TezUncheckedException("Unhandled tez event type: "
+              + tezEvent.getEventType());
+        }
+      } catch (Exception e){
+        throw new AMUserCodeException(Source.EdgeManager,
+            "Fail to maybeAddTezEventForDestinationTask, event:" + tezEvent.getEvent()
+            + ", sourceInfo:" + tezEvent.getSourceInfo() + ", destinationInfo:"
+            + tezEvent.getDestinationInfo() + ", " + getEdgeInfo(), e);
+      }
+    }
+    return true;
+  }
+
   private void sendEventToTask(Task task, TezEvent tezEvent) {
     task.registerTezEvent(tezEvent);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 11a6483..6053806 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -21,18 +21,25 @@ package org.apache.tez.dag.app.dag.impl;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
 
-import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 import com.google.common.base.Preconditions;
 
-public class OneToOneEdgeManager extends EdgeManagerPlugin {
+public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
 
   List<Integer> destinationInputIndices = 
       Collections.unmodifiableList(Collections.singletonList(0));
+  AtomicBoolean stateChecked = new AtomicBoolean(false);
+ 
+  final EventRouteMetadata commonRouteMeta = 
+      EventRouteMetadata.create(1, new int[]{0}, new int[]{0});
 
   public OneToOneEdgeManager(EdgeManagerPluginContext context) {
     super(context);
@@ -57,16 +64,42 @@ public class OneToOneEdgeManager extends EdgeManagerPlugin {
   public void routeDataMovementEventToDestination(DataMovementEvent event,
       int sourceTaskIndex, int sourceOutputIndex, 
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
-    // by the time routing is initiated all task counts must be determined and stable
-    Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
-        .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
-        + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
-        + getContext().getDestinationVertexNumTasks() + " Source: "
-        + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
+    checkState();
     destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
   }
   
   @Override
+  public void prepareForRouting() throws Exception {
+    checkState();
+  }
+  
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(
+      int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
+      throws Exception {
+    if (sourceTaskIndex == destinationTaskIndex) {
+      return commonRouteMeta;
+    }
+    return null;
+  }
+  
+  @Override
+  public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex)
+      throws Exception {
+    if (sourceTaskIndex == destinationTaskIndex) {
+      return commonRouteMeta;
+    }
+    return null;
+  }
+
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+    return commonRouteMeta;
+  }
+
+  @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
     destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
@@ -79,8 +112,26 @@ public class OneToOneEdgeManager extends EdgeManagerPlugin {
   }
   
   @Override
+  public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
+    return destinationTaskIndex;
+  }
+
+  @Override
   public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
     return 1;
   }
+  
+  private void checkState() {
+    if (stateChecked.get()) {
+      return;
+    }
+    // by the time routing is initiated all task counts must be determined and stable
+    Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
+        .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
+        + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
+        + getContext().getDestinationVertexNumTasks() + " Source: "
+        + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
+    stateChecked.set(true);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index e2608cd..3b66b8f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -18,18 +18,29 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
 
-import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
-public class ScatterGatherEdgeManager extends EdgeManagerPlugin {
+public class ScatterGatherEdgeManager extends EdgeManagerPluginOnDemand {
+  
+  private AtomicReference<ArrayList<EventRouteMetadata>> commonRouteMeta = 
+      new AtomicReference<ArrayList<EventRouteMetadata>>();
+  private Object commonRouteMetaLock = new Object();
+  private int[][] sourceIndices;
+  private int[][] targetIndices;
 
   public ScatterGatherEdgeManager(EdgeManagerPluginContext context) {
     super(context);
@@ -53,6 +64,69 @@ public class ScatterGatherEdgeManager extends EdgeManagerPlugin {
     return physicalOutputs;
   }
 
+  private ArrayList<EventRouteMetadata> getOrCreateCommonRouteMeta() {
+    ArrayList<EventRouteMetadata> metaData = commonRouteMeta.get();
+    if (metaData == null) {
+      synchronized (commonRouteMetaLock) {
+        metaData = commonRouteMeta.get();
+        if (metaData == null) {
+          int numSourceTasks = getContext().getSourceVertexNumTasks();
+          ArrayList<EventRouteMetadata> localEventMeta = Lists
+              .newArrayListWithCapacity(numSourceTasks);
+          for (int i=0; i<numSourceTasks; ++i) {
+            localEventMeta.add(EventRouteMetadata.create(1, new int[]{i}, new int[]{0}));
+          }
+          Preconditions.checkState(commonRouteMeta.compareAndSet(null, localEventMeta));
+          metaData = commonRouteMeta.get();
+        }
+      }
+    }
+    return metaData;
+  }
+
+  private void createIndices() {
+    // source indices derive from num dest tasks (==partitions)
+    int numTargetTasks = getContext().getDestinationVertexNumTasks();
+    sourceIndices = new int[numTargetTasks][];
+    for (int i=0; i<numTargetTasks; ++i) {
+      sourceIndices[i] = new int[]{i};
+    }
+    // target indices derive from num src tasks
+    int numSourceTasks = getContext().getSourceVertexNumTasks();
+    targetIndices = new int[numSourceTasks][];
+    for (int i=0; i<numSourceTasks; ++i) {
+      targetIndices[i] = new int[]{i};
+    }
+  }
+  
+  @Override
+  public void prepareForRouting() throws Exception {
+    createIndices();    
+  }
+
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(
+      int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {
+    if (sourceOutputIndex == destinationTaskIndex) {
+      return getOrCreateCommonRouteMeta().get(sourceTaskIndex);
+    }
+    return null;
+  }
+  
+  @Override
+  public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex)
+      throws Exception {
+    return EventRouteMetadata.create(1, targetIndices[sourceTaskIndex], 
+        sourceIndices[destinationTaskIndex]);
+  }
+
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+    return getOrCreateCommonRouteMeta().get(sourceTaskIndex);
+  }
+
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event,
       int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
@@ -76,6 +150,11 @@ public class ScatterGatherEdgeManager extends EdgeManagerPlugin {
   }
 
   @Override
+  public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
+    return destinationFailedInputIndex;
+  }
+
+  @Override
   public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
     return getContext().getDestinationVertexNumTasks();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/05f77fe2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 15382a8..2e884e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1493,12 +1493,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
   
-  @Private
-  @VisibleForTesting
-  public List<TezEvent> getTaskEvents() {
-    return tezEventsForTaskAttempts;
-  }
-
   private static class KillTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override