You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/18 07:03:52 UTC

git commit: TEZ-801. Support routing of event to multiple destination physical inputs (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 4fd601e59 -> 06bc6eabe


TEZ-801. Support routing of event to multiple destination physical inputs (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/06bc6eab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/06bc6eab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/06bc6eab

Branch: refs/heads/master
Commit: 06bc6eabe48e26e0195864b7672adf2cab9311aa
Parents: 4fd601e
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Feb 17 22:03:41 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Feb 17 22:03:41 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/dag/api/EdgeManager.java     |  75 ++++++++------
 .../runtime/api/events/InputFailedEvent.java    |  10 +-
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |  33 +++---
 .../org/apache/tez/dag/app/dag/impl/Edge.java   | 103 +++++++++++++------
 .../tez/dag/app/dag/impl/NullEdgeManager.java   |  19 ++--
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  38 +++----
 .../app/dag/impl/ScatterGatherEdgeManager.java  |  32 +++---
 .../org/apache/tez/test/EdgeManagerForTest.java |  21 ++--
 .../vertexmanager/ShuffleVertexManager.java     |  65 +++++++-----
 .../vertexmanager/TestShuffleVertexManager.java |  31 ++++--
 10 files changed, 268 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
index d855f6c..e7beec1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
@@ -19,9 +19,10 @@
 package org.apache.tez.dag.api;
 
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 /**
@@ -31,6 +32,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
  * 
  * Implementations must provide a 0 argument public constructor.
  */
+@Evolving
 public interface EdgeManager {
   
   /**
@@ -49,63 +51,78 @@ public interface EdgeManager {
   public void initialize(EdgeManagerContext edgeManagerContext);
   
   /**
-   * Get the number of inputs on the destination task
+   * Get the number of physical inputs on the destination task
    * @param numSourceTasks Total number of source tasks
    * @param destinationTaskIndex Index of destination task for which number of 
    * inputs is needed
-   * @return Number of inputs on the destination task
+   * @return Number of physical inputs on the destination task
    */
-  public int getNumDestinationTaskInputs(int numSourceTasks, 
+  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
       int destinationTaskIndex);
 
   /**
-   * Get the number of outputs on the source task
+   * Get the number of physical outputs on the source task
    * @param numDestinationTasks Total number of destination tasks
    * @param sourceTaskIndex Index of the source task for which number of outputs 
    * is needed
-   * @return Number of outputs on the source task
+   * @return Number of physical outputs on the source task
    */
-  public int getNumSourceTaskOutputs(int numDestinationTasks, 
+  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
       int sourceTaskIndex);
   
   /**
-   * Return the destination task indeces that need to be sent an input available 
-   * event because the source task output is now available
-   * @param event Data movement event
-   * @param sourceTaskIndex Source task
-   * @param numDestinationTasks Total number of destination tasks
-   * @param taskIndices List into which the destination task indices is to be 
-   * returned
+   * Return the routing information to inform consumers about the source task
+   * output that is now available. The return Map has the routing information.
+   * Key is the destination task physical input index and the value is the list
+   * of destination task indices for which the key input index will receive the
+   * data movement event.
+   * @param event
+   *          Data movement event
+   * @param sourceTaskIndex
+   *          Source task
+   * @param numDestinationTasks
+   *          Total number of destination tasks
+   * @param inputIndicesToTaskIndices
+   *          Map via which the routing information is returned
    */
-  public void routeEventToDestinationTasks(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+  public void routeDataMovementEventToDestination(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks,
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices);
   
   /**
-   * Return the destination task indeces that need to be sent an input failed 
-   * event because the source task output is no longer available
-   * @param event Input failed event
-   * @param sourceTaskIndex Failed source task
-   * @param numDestinationTasks Total number of destination tasks
-   * @param taskIndices List into which the destination task indices is to be 
-   * returned
+   * Return the routing information to inform consumers about the failure of a
+   * source task whose outputs have been potentially lost. The return Map has
+   * the routing information. Key is the destination task physical input index
+   * and the value is the list of destination task indices for which the key
+   * input index will receive the input failure notification. This method will
+   * be called once for every source task failure and information for all
+   * affected destinations must be provided in that invocation.
+   * 
+   * @param sourceTaskIndex
+   *          Source task
+   * @param numDestinationTasks
+   *          Total number of destination tasks
+   * @param inputIndicesToTaskIndices
+   *          Map via which the routing information is returned
    */
-  public void routeEventToDestinationTasks(InputFailedEvent event,
-      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices);
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      int numDestinationTasks,
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices);
 
   /**
    * Get the number of destination tasks that consume data from the source task
    * @param sourceTaskIndex Source task index
    * @param numDestinationTasks Total number of destination tasks
    */
-  public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks);
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks);
   
   /**
    * Return the source task index to which to send the input error event
-   * @param destinationTaskIndex Destination task that reported the error
    * @param event Input read error event. Has more information about the error
+   * @param destinationTaskIndex Destination task that reported the error
    * @return Index of the source task that created the unavailable input
    */
-  public int routeEventToSourceTasks(int destinationTaskIndex,
-      InputReadErrorEvent event);
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex);
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
index fb44462..a4648f3 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputFailedEvent.java
@@ -46,11 +46,11 @@ public class InputFailedEvent extends Event{
    */
   private int version;
   
-  @Private
+  @Private // for Writable
   public InputFailedEvent() {
   }
   
-  @Private // for Writable
+  @Private
   public InputFailedEvent(int sourceIndex,
       int targetIndex,
       int version) {
@@ -58,6 +58,12 @@ public class InputFailedEvent extends Event{
     this.targetIndex = targetIndex;
     this.version = version;
   }
+  
+  @Private
+  public InputFailedEvent(int targetIndex, int version) {
+    this.targetIndex = targetIndex;
+    this.version = version;
+  }
 
   public int getSourceIndex() {
     return sourceIndex;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 5e5314e..305e085 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -19,12 +19,14 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
+
+import com.google.common.collect.Lists;
 
 public class BroadcastEdgeManager implements EdgeManager {
 
@@ -34,34 +36,37 @@ public class BroadcastEdgeManager implements EdgeManager {
   }
   
   @Override
-  public int getNumDestinationTaskInputs(int numSourceTasks, 
+  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
       int destinationTaskIndex) {
     return numSourceTasks;
   }
   
   @Override
-  public int getNumSourceTaskOutputs(int numDestinationTasks,
+  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
       int sourceTaskIndex) {
     return 1;
   }
   
   @Override
-  public void routeEventToDestinationTasks(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
-    event.setTargetIndex(sourceTaskIndex);
+  public void routeDataMovementEventToDestination(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+    List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
     addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
   }
   
   @Override
-  public void routeEventToDestinationTasks(InputFailedEvent event,
-      int sourceTaskIndex, int numDestinationTasks , List<Integer> taskIndices) {
-    event.setTargetIndex(sourceTaskIndex);
-    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);    
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      int numDestinationTasks,
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+    List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
+    addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
   }
-  
+
   @Override
-  public int routeEventToSourceTasks(int destinationTaskIndex,
-      InputReadErrorEvent event) {
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex) {
     return event.getIndex();
   }
   
@@ -72,7 +77,7 @@ public class BroadcastEdgeManager implements EdgeManager {
   }
 
   @Override
-  public int getDestinationConsumerTaskNumber(int sourceTaskIndex,
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex,
       int numDestTasks) {
     return numDestTasks;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 288a417..55ab86f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -46,6 +47,8 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
+import com.google.common.collect.Maps;
+
 public class Edge {
 
   static class EdgeManagerContextImpl implements EdgeManagerContext {
@@ -163,13 +166,13 @@ public class Edge {
   public InputSpec getDestinationSpec(int destinationTaskIndex) {
     return new InputSpec(sourceVertex.getName(),
         edgeProperty.getEdgeDestination(),
-        edgeManager.getNumDestinationTaskInputs(sourceVertex.getTotalTasks(),
+        edgeManager.getNumDestinationTaskPhysicalInputs(sourceVertex.getTotalTasks(),
             destinationTaskIndex));
   }
 
   public OutputSpec getSourceSpec(int sourceTaskIndex) {
     return new OutputSpec(destinationVertex.getName(),
-        edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskOutputs(
+        edgeProperty.getEdgeSource(), edgeManager.getNumSourceTaskPhysicalOutputs(
             destinationVertex.getTotalTasks(), sourceTaskIndex));
   }
   
@@ -199,9 +202,9 @@ public class Edge {
         TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo()
             .getTaskAttemptID();
         int destTaskIndex = destAttemptId.getTaskID().getId();
-        int srcTaskIndex = edgeManager.routeEventToSourceTasks(destTaskIndex,
-            event);
-        int numConsumers = edgeManager.getDestinationConsumerTaskNumber(
+        int srcTaskIndex = edgeManager.routeInputErrorEventToSource(event,
+            destTaskIndex);
+        int numConsumers = edgeManager.getNumDestinationConsumerTasks(
             srcTaskIndex, destinationVertex.getTotalTasks());
         Task srcTask = sourceVertex.getTask(srcTaskIndex);
         if (srcTask == null) {
@@ -239,9 +242,52 @@ public class Edge {
     }
   }
   
+  void sendDmEventOrIfEventToTasks(TezEvent tezEvent, int srcTaskIndex,
+      boolean isDataMovementEvent,
+      Map<Integer, List<Integer>> ifInputIndicesToTaskIndices) {
+    int num = 0;
+    Event event = tezEvent.getEvent();
+    for (Map.Entry<Integer, List<Integer>> entry : ifInputIndicesToTaskIndices.entrySet()) {
+      ++num;
+      TezEvent tezEventToSend = null;
+      if (num == ifInputIndicesToTaskIndices.size()) {
+        if (isDataMovementEvent) {
+          ((DataMovementEvent) event).setTargetIndex(entry.getKey().intValue());
+        } else {
+          ((InputFailedEvent) event).setTargetIndex(entry.getKey().intValue());
+        }
+        tezEventToSend = tezEvent;
+      } else {
+        Event e;
+        if (isDataMovementEvent) {
+          DataMovementEvent dmEvent = (DataMovementEvent) event;
+          e = new DataMovementEvent(dmEvent.getSourceIndex(), 
+              entry.getKey().intValue(), dmEvent.getVersion(), dmEvent.getUserPayload());
+        } else {
+          InputFailedEvent ifEvent = ((InputFailedEvent) event);
+          e = new InputFailedEvent(entry.getKey().intValue(), ifEvent.getVersion());
+        }
+        tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo());
+      }
+      tezEventToSend.setDestinationInfo(destinationMetaInfo);
+      for(Integer destTaskIndex : entry.getValue()) {
+        Task destTask = destinationVertex.getTask(destTaskIndex);
+        if (destTask == null) {
+          throw new TezUncheckedException("Unexpected null task." +
+              " sourceVertex=" + sourceVertex.getVertexId() +
+              " srcIndex = " + srcTaskIndex +
+              " destAttemptId=" + destinationVertex.getVertexId() +
+              " destIndex=" + destTaskIndex + 
+              " edgeManager=" + edgeManager.getClass().getName());
+        }
+        TezTaskID destTaskId = destTask.getTaskId();
+        sendEventToTask(destTaskId, tezEventToSend);
+      }
+    }
+  }
+  
   public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
     if (!bufferEvents.get()) {
-      List<Integer> destTaskIndices = new ArrayList<Integer>();
       boolean isDataMovementEvent = true;
       switch (tezEvent.getEventType()) {
       case COMPOSITE_DATA_MOVEMENT_EVENT:
@@ -249,33 +295,32 @@ public class Edge {
         break;
       case INPUT_FAILED_EVENT:
         isDataMovementEvent = false;
+        // fall through
       case DATA_MOVEMENT_EVENT:
-        Event event = tezEvent.getEvent();
-        TezTaskAttemptID sourceAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
-        int sourceTaskIndex = sourceAttemptId.getTaskID().getId();
+        Map<Integer, List<Integer>> inputIndicesToTaskIndices = Maps
+        .newHashMap();
+        TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo()
+            .getTaskAttemptID();
+        int srcTaskIndex = srcAttemptId.getTaskID().getId();
         if (isDataMovementEvent) {
-          edgeManager.routeEventToDestinationTasks((DataMovementEvent) event,
-              sourceTaskIndex, destinationVertex.getTotalTasks(),
-              destTaskIndices);
+          DataMovementEvent dmEvent = (DataMovementEvent)tezEvent.getEvent();
+          edgeManager.routeDataMovementEventToDestination(dmEvent,
+                srcTaskIndex, destinationVertex.getTotalTasks(),
+                inputIndicesToTaskIndices);
+        } else {
+          edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex,
+              destinationVertex.getTotalTasks(), inputIndicesToTaskIndices);
+        }
+        if (!inputIndicesToTaskIndices.isEmpty()) {
+          sendDmEventOrIfEventToTasks(tezEvent, srcTaskIndex, isDataMovementEvent, inputIndicesToTaskIndices);
         } else {
-          edgeManager.routeEventToDestinationTasks((InputFailedEvent) event,
-              sourceTaskIndex, destinationVertex.getTotalTasks(),
-              destTaskIndices);
+          throw new TezUncheckedException("Event must be routed." +
+              " sourceVertex=" + sourceVertex.getVertexId() +
+              " srcIndex = " + srcTaskIndex +
+              " destAttemptId=" + destinationVertex.getVertexId() +
+              " edgeManager=" + edgeManager.getClass().getName() + 
+              " Event type=" + tezEvent.getEventType());
         }
-        tezEvent.setDestinationInfo(destinationMetaInfo);
-        for(Integer destTaskIndex : destTaskIndices) {
-          Task destTask = destinationVertex.getTask(destTaskIndex);
-          if (destTask == null) {
-            throw new TezUncheckedException("Unexpected null task." +
-                " sourceVertex=" + sourceVertex.getVertexId() +
-                " srcIndex = " + sourceTaskIndex +
-                " destAttemptId=" + destinationVertex.getVertexId() +
-                " destIndex=" + destTaskIndex + 
-                " edgeManager=" + edgeManager.getClass().getName());
-          }
-          TezTaskID destTaskId = destTask.getTaskId();
-          sendEventToTask(destTaskId, tezEvent);
-        }        
         break;
       default:
         throw new TezUncheckedException("Unhandled tez event type: "

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
index 7cde07b..02f5154 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/NullEdgeManager.java
@@ -19,11 +19,11 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 public class NullEdgeManager implements EdgeManager {
@@ -36,39 +36,40 @@ public class NullEdgeManager implements EdgeManager {
   }
 
   @Override
-  public int getNumDestinationTaskInputs(int numSourceTasks, int destinationTaskIndex) {
+  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, int destinationTaskIndex) {
     throw new UnsupportedOperationException(
         "Cannot route events. EdgeManager should have been replaced at runtime");
   }
 
   @Override
-  public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex) {
     throw new UnsupportedOperationException(
         "Cannot route events. EdgeManager should have been replaced at runtime");
   }
 
   @Override
-  public void routeEventToDestinationTasks(DataMovementEvent event, int sourceTaskIndex,
-      int numDestinationTasks, List<Integer> taskIndices) {
+  public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex,
+      int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
     throw new UnsupportedOperationException(
         "Cannot route events. EdgeManager should have been replaced at runtime");
   }
 
   @Override
-  public void routeEventToDestinationTasks(InputFailedEvent event, int sourceTaskIndex,
-      int numDestinationTasks, List<Integer> taskIndices) {
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
     throw new UnsupportedOperationException(
         "Cannot route events. EdgeManager should have been replaced at runtime");
   }
 
   @Override
-  public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks) {
+  public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex) {
     throw new UnsupportedOperationException(
         "Cannot route events. EdgeManager should have been replaced at runtime");
   }
 
   @Override
-  public int routeEventToSourceTasks(int destinationTaskIndex, InputReadErrorEvent event) {
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      int numDestinationTasks,
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
     throw new UnsupportedOperationException(
         "Cannot route events. EdgeManager should have been replaced at runtime");
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index e026db9..1de25b9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -18,13 +18,14 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
 
 public class OneToOneEdgeManager implements EdgeManager {
 
@@ -34,43 +35,42 @@ public class OneToOneEdgeManager implements EdgeManager {
   }
 
   @Override
-  public int getNumDestinationTaskInputs(int numDestinationTasks, 
+  public int getNumDestinationTaskPhysicalInputs(int numDestinationTasks, 
       int destinationTaskIndex) {
     return 1;
   }
   
   @Override
-  public int getNumSourceTaskOutputs(int numDestinationTasks, 
+  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
       int sourceTaskIndex) {
     return 1;
   }
   
   @Override
-  public void routeEventToDestinationTasks(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
-    event.setTargetIndex(0);
-    addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+  public void routeDataMovementEventToDestination(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, 
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+    inputIndicesToTaskIndices.put(new Integer(0), 
+        Collections.singletonList(new Integer(sourceTaskIndex)));
   }
   
   @Override
-  public void routeEventToDestinationTasks(InputFailedEvent event,
-      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
-    event.setTargetIndex(0);
-    addDestinationTaskIndex(sourceTaskIndex, taskIndices);
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      int numDestinationTasks,
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+    inputIndicesToTaskIndices.put(new Integer(0), 
+        Collections.singletonList(new Integer(sourceTaskIndex)));
   }
-  
+
   @Override
-  public int routeEventToSourceTasks(int destinationTaskIndex,
-      InputReadErrorEvent event) {
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex) {
     return destinationTaskIndex;
   }
   
-  void addDestinationTaskIndex(int sourceTaskIndex, List<Integer> taskIndeces) {
-    taskIndeces.add(new Integer(sourceTaskIndex));
-  }
-
   @Override
-  public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestTasks) {
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestTasks) {
     return 1;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index c825a0a..9f4d61e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -18,13 +18,16 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
+
+import com.google.common.collect.Lists;
 
 public class ScatterGatherEdgeManager implements EdgeManager {
 
@@ -34,39 +37,40 @@ public class ScatterGatherEdgeManager implements EdgeManager {
   }
 
   @Override
-  public int getNumDestinationTaskInputs(int numSourceTasks,
+  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
       int destinationTaskIndex) {
     return numSourceTasks;
   }
   
   @Override
-  public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex) {
     return numDestinationTasks;
   }
 
   @Override
-  public void routeEventToDestinationTasks(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
-    int destinationTaskIndex = event.getSourceIndex();
-    event.setTargetIndex(sourceTaskIndex);
-    taskIndices.add(new Integer(destinationTaskIndex));
+  public void routeDataMovementEventToDestination(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), 
+        Collections.singletonList(new Integer(event.getSourceIndex())));
   }
 
   @Override
-  public void routeEventToDestinationTasks(InputFailedEvent event,
-      int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
-    event.setTargetIndex(sourceTaskIndex);
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      int numDestinationTasks,
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+    List<Integer> taskIndices = Lists.newArrayListWithCapacity(numDestinationTasks);
     addAllDestinationTaskIndices(numDestinationTasks, taskIndices);
+    inputIndicesToTaskIndices.put(new Integer(sourceTaskIndex), taskIndices);
   }
 
   @Override
-  public int routeEventToSourceTasks(int destinationTaskIndex,
-      InputReadErrorEvent event) {
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex) {
     return event.getIndex();
   }
 
   @Override
-  public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestTasks) {
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestTasks) {
     return numDestTasks;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
index ff8fdbe..7fbd4d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java
@@ -19,11 +19,11 @@
 package org.apache.tez.test;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 public class EdgeManagerForTest implements EdgeManager {
@@ -58,33 +58,34 @@ public class EdgeManagerForTest implements EdgeManager {
   }
 
   @Override
-  public int getNumDestinationTaskInputs(int numSourceTasks, int destinationTaskIndex) {
+  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, int destinationTaskIndex) {
     return 0;
   }
 
   @Override
-  public int getNumSourceTaskOutputs(int numDestinationTasks, int sourceTaskIndex) {
+  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, int sourceTaskIndex) {
     return 0;
   }
 
   @Override
-  public void routeEventToDestinationTasks(DataMovementEvent event, int sourceTaskIndex,
-      int numDestinationTasks, List<Integer> taskIndices) {
+  public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex,
+      int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
   }
 
   @Override
-  public void routeEventToDestinationTasks(InputFailedEvent event, int sourceTaskIndex,
-      int numDestinationTasks, List<Integer> taskIndices) {
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
+    return 0;
   }
 
   @Override
-  public int getDestinationConsumerTaskNumber(int sourceTaskIndex, int numDestinationTasks) {
+  public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex) {
     return 0;
   }
 
   @Override
-  public int routeEventToSourceTasks(int destinationTaskIndex, InputReadErrorEvent event) {
-    return 0;
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      int numDestinationTasks,
+      Map<Integer, List<Integer>> inputIndicesToTaskIndices) { 
   }
   
   // End of overridden methods

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 0776cf0..70e9fae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.library.vertexmanager;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,11 +42,11 @@ import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -144,7 +145,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
 
     @Override
-    public int getNumDestinationTaskInputs(int numSourceTasks, 
+    public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
         int destinationTaskIndex) {
       int partitionRange = 1;
       if(destinationTaskIndex < numDestinationTasks-1) {
@@ -156,14 +157,14 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
 
     @Override
-    public int getNumSourceTaskOutputs(int numDestinationTasks, 
+    public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
         int sourceTaskIndex) {
       return numSourceTaskOutputs;
     }
     
     @Override
-    public void routeEventToDestinationTasks(DataMovementEvent event,
-        int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
+    public void routeDataMovementEventToDestination(DataMovementEvent event,
+        int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
       int sourceIndex = event.getSourceIndex();
       int destinationTaskIndex = sourceIndex/basePartitionRange;
       int partitionRange = 1;
@@ -178,32 +179,46 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
           sourceTaskIndex * partitionRange 
           + sourceIndex % partitionRange;
       
-      event.setTargetIndex(targetIndex);
-      taskIndices.add(new Integer(destinationTaskIndex));
+      inputIndicesToTaskIndices.put(new Integer(targetIndex), 
+          Collections.singletonList(new Integer(destinationTaskIndex)));
     }
-
+    
     @Override
-    public void routeEventToDestinationTasks(InputFailedEvent event,
-        int sourceTaskIndex, int numDestinationTasks, List<Integer> taskIndices) {
-      int sourceIndex = event.getSourceIndex();
-      int destinationTaskIndex = sourceIndex/basePartitionRange;
-      int partitionRange = 1;
-      if(destinationTaskIndex < numDestinationTasks-1) {
-        partitionRange = basePartitionRange;
+    public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, 
+        int numDestinationTasks, 
+        Map<Integer, List<Integer>> inputIndicesToTaskIndices) {
+      if (remainderRangeForLastShuffler < basePartitionRange) {
+        List<Integer> lastTask = Collections.singletonList(
+            new Integer(numDestinationTasks-1));
+        List<Integer> otherTasks = Lists.newArrayListWithCapacity(numDestinationTasks-1);
+        for (int i=0; i<numDestinationTasks-1; ++i) {
+          otherTasks.add(new Integer(i));
+        }
+        
+        int startOffset = sourceTaskIndex * basePartitionRange;
+        for (int i=0; i<basePartitionRange; ++i) {
+          inputIndicesToTaskIndices.put(new Integer(startOffset+i), otherTasks);
+        }
+        startOffset = sourceTaskIndex * remainderRangeForLastShuffler;
+        for (int i=0; i<remainderRangeForLastShuffler; ++i) {
+          inputIndicesToTaskIndices.put(new Integer(startOffset+i), lastTask);
+        }
       } else {
-        partitionRange = remainderRangeForLastShuffler;
+        // all tasks have same pattern
+        List<Integer> allTasks = Lists.newArrayListWithCapacity(numDestinationTasks);
+        for (int i=0; i<numDestinationTasks; ++i) {
+          allTasks.add(new Integer(i));
+        }
+        int startOffset = sourceTaskIndex * basePartitionRange;
+        for (int i=0; i<basePartitionRange; ++i) {
+          inputIndicesToTaskIndices.put(new Integer(startOffset+i), allTasks);
+        }
       }
-      int targetIndex = 
-          sourceTaskIndex * partitionRange 
-          + sourceIndex % partitionRange;
-      
-      event.setTargetIndex(targetIndex);
-      taskIndices.add(new Integer(destinationTaskIndex));
     }
 
     @Override
-    public int routeEventToSourceTasks(int destinationTaskIndex,
-        InputReadErrorEvent event) {
+    public int routeInputErrorEventToSource(InputReadErrorEvent event,
+        int destinationTaskIndex) {
       int partitionRange = 1;
       if(destinationTaskIndex < numDestinationTasks-1) {
         partitionRange = basePartitionRange;
@@ -214,7 +229,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
 
     @Override
-    public int getDestinationConsumerTaskNumber(int sourceTaskIndex,
+    public int getNumDestinationConsumerTasks(int sourceTaskIndex,
         int numDestTasks) {
       return numDestTasks;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/06bc6eab/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 1a26487..fd11378 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -42,7 +42,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import static org.mockito.Mockito.*;
 
@@ -198,16 +198,31 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(2, newEdgeManagers.size());
     
     EdgeManager edgeManager = newEdgeManagers.values().iterator().next();
-    List<Integer> targets = Lists.newArrayList();
+    Map<Integer, List<Integer>> targets = Maps.newHashMap();
     DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
-    edgeManager.routeEventToDestinationTasks(dmEvent, 1, 2, targets);
-    Assert.assertEquals(3, dmEvent.getTargetIndex());
-    Assert.assertEquals(0, targets.get(0).intValue());
+    edgeManager.routeDataMovementEventToDestination(dmEvent, 1, 2, targets);
+    Assert.assertEquals(1, targets.size());
+    Map.Entry<Integer, List<Integer>> e = targets.entrySet().iterator().next();
+    Assert.assertEquals(3, e.getKey().intValue());
+    Assert.assertEquals(1, e.getValue().size());
+    Assert.assertEquals(0, e.getValue().get(0).intValue());
     targets.clear();
     dmEvent = new DataMovementEvent(2, new byte[0]);
-    edgeManager.routeEventToDestinationTasks(dmEvent, 0, 2, targets);
-    Assert.assertEquals(0, dmEvent.getTargetIndex());
-    Assert.assertEquals(1, targets.get(0).intValue());    
+    edgeManager.routeDataMovementEventToDestination(dmEvent, 0, 2, targets);
+    Assert.assertEquals(1, targets.size());
+    e = targets.entrySet().iterator().next();
+    Assert.assertEquals(0, e.getKey().intValue());
+    Assert.assertEquals(1, e.getValue().size());
+    Assert.assertEquals(1, e.getValue().get(0).intValue());
+    targets.clear();
+    edgeManager.routeInputSourceTaskFailedEventToDestination(2, 2, targets);
+    Assert.assertEquals(2, targets.size());
+    for (Map.Entry<Integer, List<Integer>> entry : targets.entrySet()) {
+      Assert.assertTrue(entry.getKey().intValue() == 4 || entry.getKey().intValue() == 5);
+      Assert.assertEquals(2, entry.getValue().size());
+      Assert.assertEquals(0, entry.getValue().get(0).intValue());
+      Assert.assertEquals(1, entry.getValue().get(1).intValue());
+    }
   }
   
   @SuppressWarnings({ "unchecked", "rawtypes" })