You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/29 22:48:58 UTC

tez git commit: TEZ-2478. Move OneToOne routing to store events in Tasks. (Contributed by Bikas Saha and Siddharth Seth)

Repository: tez
Updated Branches:
  refs/heads/master fafa804c9 -> 413c3cc79


TEZ-2478. Move OneToOne routing to store events in Tasks. (Contributed by Bikas Saha and Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/413c3cc7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/413c3cc7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/413c3cc7

Branch: refs/heads/master
Commit: 413c3cc7991e810d6b49539d71ecc930e838238e
Parents: fafa804
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 29 13:48:31 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 29 13:48:31 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |   7 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   4 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  14 +-
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |  52 +------
 .../dag/impl/OneToOneEdgeManagerOnDemand.java   | 137 +++++++++++++++++++
 .../apache/tez/dag/app/dag/impl/TestEdge.java   |  41 +++++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   2 +-
 .../tez/test/TestExceptionPropagation.java      |   7 +-
 9 files changed, 204 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6fb6051..4e5f0cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2478. Move OneToOne routing to store events in Tasks.
   TEZ-2503. findbugs version isn't reported properly in test-patch report.
   TEZ-2198. Fix sorter spill counts.
   TEZ-1883. Change findbugs version to 3.x.

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 99c7c9d..15b1333 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -322,6 +322,13 @@ public class TezConfiguration extends Configuration {
       "cancel.delegation.tokens.on.completion";
   public static final boolean TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION_DEFAULT = true;
 
+  @Private
+  @Unstable
+  @ConfigurationScope(Scope.DAG)
+  public static final String TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING =
+      TEZ_AM_PREFIX + "one-to-one.routing.use.on-demand-routing";
+  public static final boolean TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING_DEFAULT = false;
+
   /**
    * Int value. The number of threads used to listen to task heartbeat requests.
    * Expert level setting.

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 38da302..640cd7d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -191,7 +191,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   TezCounters fullCounters = null;
   private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
 
-  public final Configuration dagConf;
+  private final Configuration dagConf;
   private final DAGPlan jobPlan;
   
   Map<String, LocalResource> localResources;
@@ -1478,7 +1478,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
       // edge manager may be also set via API when using custom edge type
       dag.edges.put(edgePlan.getId(),
-          new Edge(edgeProperty, dag.getEventHandler()));
+          new Edge(edgeProperty, dag.getEventHandler(), dagConf));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index f9cbede..ddccf8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -104,7 +106,8 @@ public class Edge {
   EdgeManagerPlugin edgeManager;
   private boolean onDemandRouting = false;
   @SuppressWarnings("rawtypes")
-  private EventHandler eventHandler;
+  private final EventHandler eventHandler;
+  private final Configuration conf;
   private AtomicBoolean bufferEvents = new AtomicBoolean(false);
   private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
   private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
@@ -116,9 +119,10 @@ public class Edge {
       .newConcurrentMap();
 
   @SuppressWarnings("rawtypes")
-  public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
+  public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) {
     this.edgeProperty = edgeProperty;
     this.eventHandler = eventHandler;
+    this.conf = conf;
     createEdgeManager();
   }
 
@@ -126,7 +130,11 @@ public class Edge {
     switch (edgeProperty.getDataMovementType()) {
       case ONE_TO_ONE:
         edgeManagerContext = new EdgeManagerPluginContextImpl(null);
-        edgeManager = new OneToOneEdgeManager(edgeManagerContext);
+        if (conf.getBoolean(TezConfiguration.TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING, TezConfiguration.TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING_DEFAULT)) {
+          edgeManager = new OneToOneEdgeManagerOnDemand(edgeManagerContext);
+        } else {
+          edgeManager = new OneToOneEdgeManager(edgeManagerContext);
+        }
         break;
       case BROADCAST:
         edgeManagerContext = new EdgeManagerPluginContextImpl(null);

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 6053806..dd38180 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -23,23 +23,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.annotation.Nullable;
-
+import org.apache.tez.dag.api.EdgeManagerPlugin;
 import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
 import com.google.common.base.Preconditions;
 
-public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
+public class OneToOneEdgeManager extends EdgeManagerPlugin {
 
-  List<Integer> destinationInputIndices = 
+  final List<Integer> destinationInputIndices =
       Collections.unmodifiableList(Collections.singletonList(0));
-  AtomicBoolean stateChecked = new AtomicBoolean(false);
- 
-  final EventRouteMetadata commonRouteMeta = 
-      EventRouteMetadata.create(1, new int[]{0}, new int[]{0});
+  final AtomicBoolean stateChecked = new AtomicBoolean(false);
 
   public OneToOneEdgeManager(EdgeManagerPluginContext context) {
     super(context);
@@ -69,37 +64,6 @@ public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
   }
   
   @Override
-  public void prepareForRouting() throws Exception {
-    checkState();
-  }
-  
-  @Override
-  public EventRouteMetadata routeDataMovementEventToDestination(
-      int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
-      throws Exception {
-    if (sourceTaskIndex == destinationTaskIndex) {
-      return commonRouteMeta;
-    }
-    return null;
-  }
-  
-  @Override
-  public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
-      int sourceTaskIndex, int destinationTaskIndex)
-      throws Exception {
-    if (sourceTaskIndex == destinationTaskIndex) {
-      return commonRouteMeta;
-    }
-    return null;
-  }
-
-  @Override
-  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
-      int sourceTaskIndex, int destinationTaskIndex) throws Exception {
-    return commonRouteMeta;
-  }
-
-  @Override
   public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
       Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
     destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
@@ -112,15 +76,10 @@ public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
   }
   
   @Override
-  public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
-    return destinationTaskIndex;
-  }
-
-  @Override
   public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
     return 1;
   }
-  
+
   private void checkState() {
     if (stateChecked.get()) {
       return;
@@ -133,5 +92,4 @@ public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
         + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
     stateChecked.set(true);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
new file mode 100644
index 0000000..84e7e66
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
@@ -0,0 +1,137 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import com.google.common.base.Preconditions;
+
+public class OneToOneEdgeManagerOnDemand extends EdgeManagerPluginOnDemand {
+
+  final List<Integer> destinationInputIndices =
+      Collections.unmodifiableList(Collections.singletonList(0));
+  final AtomicBoolean stateChecked = new AtomicBoolean(false);
+ 
+  final EventRouteMetadata commonRouteMeta = 
+      EventRouteMetadata.create(1, new int[]{0}, new int[]{0});
+
+  public OneToOneEdgeManagerOnDemand(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() {
+    // Nothing to do.
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+    return 1;
+  }
+  
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
+    return 1;
+  }
+  
+  @Override
+  public void routeDataMovementEventToDestination(DataMovementEvent event,
+      int sourceTaskIndex, int sourceOutputIndex, 
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    checkState();
+    destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+  }
+  
+  @Override
+  public void prepareForRouting() throws Exception {
+    checkState();
+  }
+  
+  @Override
+  public EventRouteMetadata routeDataMovementEventToDestination(
+      int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
+      throws Exception {
+    if (sourceTaskIndex == destinationTaskIndex) {
+      return commonRouteMeta;
+    }
+    return null;
+  }
+  
+  @Override
+  public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex)
+      throws Exception {
+    if (sourceTaskIndex == destinationTaskIndex) {
+      return commonRouteMeta;
+    }
+    return null;
+  }
+
+  @Override
+  public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+      int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+    return commonRouteMeta;
+  }
+
+  @Override
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+    destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex, int destinationFailedInputIndex) {
+    return destinationTaskIndex;
+  }
+  
+  @Override
+  public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
+    return destinationTaskIndex;
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return 1;
+  }
+  
+  private void checkState() {
+    if (stateChecked.get()) {
+      return;
+    }
+    // by the time routing is initiated all task counts must be determined and stable
+    Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
+        .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
+        + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
+        + getContext().getDestinationVertexNumTasks() + " Source: "
+        + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
+    stateChecked.set(true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index 5718b17..eb03d1e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -53,6 +53,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -104,6 +105,36 @@ public class TestEdge {
         .get(0).intValue());
   }
 
+  @Test (timeout = 5000)
+  public void testOneToOneEdgeManagerODR() {
+    EdgeManagerPluginContext mockContext = mock(EdgeManagerPluginContext.class);
+    when(mockContext.getSourceVertexName()).thenReturn("Source");
+    when(mockContext.getDestinationVertexName()).thenReturn("Destination");
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+    OneToOneEdgeManagerOnDemand manager = new OneToOneEdgeManagerOnDemand(mockContext);
+    manager.initialize();
+    Map<Integer, List<Integer>> destinationTaskAndInputIndices = Maps.newHashMap();
+    DataMovementEvent event = DataMovementEvent.create(1, null);
+
+    // fail when source and destination are inconsistent
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(4);
+    try {
+      manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("1-1 source and destination task counts must match"));
+    }
+
+    // now make it consistent
+    when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
+    manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+    Assert.assertEquals(1, destinationTaskAndInputIndices.size());
+    Assert.assertEquals(1, destinationTaskAndInputIndices.entrySet().iterator().next().getKey()
+        .intValue());
+    Assert.assertEquals(0, destinationTaskAndInputIndices.entrySet().iterator().next().getValue()
+        .get(0).intValue());
+  }
+
   @Test(timeout = 5000)
   public void testScatterGatherManager() {
     EdgeManagerPluginContext mockContext = mock(EdgeManagerPluginContext.class);
@@ -132,7 +163,7 @@ public class TestEdge {
     EdgeProperty edgeProp = EdgeProperty.create(DataMovementType.SCATTER_GATHER,
         DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, mock(OutputDescriptor.class),
         mock(InputDescriptor.class));
-    Edge edge = new Edge(edgeProp, eventHandler);
+    Edge edge = new Edge(edgeProp, eventHandler, new TezConfiguration());
     
     TezVertexID srcVertexID = createVertexID(1);
     TezVertexID destVertexID = createVertexID(2);
@@ -246,7 +277,7 @@ public class TestEdge {
         DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL,
         OutputDescriptor.create(""),
-        InputDescriptor.create("")), mockEventHandler);
+        InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
     TezVertexID v1Id = createVertexID(1);
     TezVertexID v2Id = createVertexID(2);
     edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
@@ -270,7 +301,7 @@ public class TestEdge {
         DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL,
         OutputDescriptor.create(""),
-        InputDescriptor.create("")), mockEventHandler);
+        InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
     TezVertexID v1Id = createVertexID(1);
     TezVertexID v2Id = createVertexID(2);
     edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
@@ -294,7 +325,7 @@ public class TestEdge {
         DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL,
         OutputDescriptor.create(""),
-        InputDescriptor.create("")), mockEventHandler);
+        InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
     TezVertexID v1Id = createVertexID(1);
     TezVertexID v2Id = createVertexID(2);
     edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
@@ -321,7 +352,7 @@ public class TestEdge {
         DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL,
         OutputDescriptor.create(""),
-        InputDescriptor.create("")), mockEventHandler);
+        InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
     TezVertexID v1Id = createVertexID(1);
     TezVertexID v2Id = createVertexID(2);
     edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e569949..0176b79 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2217,7 +2217,7 @@ public class TestVertexImpl {
     for (EdgePlan edgePlan : dagPlan.getEdgeList()) {
       EdgeProperty edgeProperty = DagTypeConverters
           .createEdgePropertyMapFromDAGPlan(edgePlan);
-      edges.put(edgePlan.getId(), new Edge(edgeProperty, dispatcher.getEventHandler()));
+      edges.put(edgePlan.getId(), new Edge(edgeProperty, dispatcher.getEventHandler(), conf));
     }
 
     parseVertexEdges();

http://git-wip-us.apache.org/repos/asf/tez/blob/413c3cc7/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 7ba6028..49bb9f5 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManagerOnDemand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -64,7 +65,6 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
 import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
@@ -154,6 +154,7 @@ public class TestExceptionPropagation {
     tezConf
         .setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
     tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING, true);
     tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
     tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
@@ -753,7 +754,7 @@ public class TestExceptionPropagation {
   }
 
   // EdgeManager for edge linking vertex1 and vertex2
-  public static class CustomEdgeManager extends OneToOneEdgeManager {
+  public static class CustomEdgeManager extends OneToOneEdgeManagerOnDemand {
 
     private ExceptionLocation exLocation;
 
@@ -822,7 +823,7 @@ public class TestExceptionPropagation {
       }
       super.prepareForRouting();
     }
-    
+
     @Override
     public EventRouteMetadata routeDataMovementEventToDestination(
         int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {