You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/29 22:54:44 UTC
tez git commit: TEZ-2478. Move OneToOne routing to store events in
Tasks. (Contributed by Bikas Saha and Siddharth Seth) (cherry picked from
commit 413c3cc7991e810d6b49539d71ecc930e838238e)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 5679904fd -> 20c71fd4f
TEZ-2478. Move OneToOne routing to store events in Tasks. (Contributed by Bikas Saha and Siddharth Seth)
(cherry picked from commit 413c3cc7991e810d6b49539d71ecc930e838238e)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/20c71fd4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/20c71fd4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/20c71fd4
Branch: refs/heads/branch-0.7
Commit: 20c71fd4fc4cc548506705eb4518af52a201c38a
Parents: 5679904
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 29 13:48:31 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 29 13:54:38 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 7 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 4 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 14 +-
.../dag/app/dag/impl/OneToOneEdgeManager.java | 52 +------
.../dag/impl/OneToOneEdgeManagerOnDemand.java | 137 +++++++++++++++++++
.../apache/tez/dag/app/dag/impl/TestEdge.java | 41 +++++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 2 +-
.../tez/test/TestExceptionPropagation.java | 7 +-
9 files changed, 204 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16e9b6c..8ad1c4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2478. Move OneToOne routing to store events in Tasks.
TEZ-2482. Tez UI: Mouse events not working on IE11
TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
TEZ-2481. Tez UI: graphical view does not render properly on IE11
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 99c7c9d..15b1333 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -322,6 +322,13 @@ public class TezConfiguration extends Configuration {
"cancel.delegation.tokens.on.completion";
public static final boolean TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION_DEFAULT = true;
+ @Private
+ @Unstable
+ @ConfigurationScope(Scope.DAG)
+ public static final String TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING =
+ TEZ_AM_PREFIX + "one-to-one.routing.use.on-demand-routing";
+ public static final boolean TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING_DEFAULT = false;
+
/**
* Int value. The number of threads used to listen to task heartbeat requests.
* Expert level setting.
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 38da302..640cd7d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -191,7 +191,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
TezCounters fullCounters = null;
private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
- public final Configuration dagConf;
+ private final Configuration dagConf;
private final DAGPlan jobPlan;
Map<String, LocalResource> localResources;
@@ -1478,7 +1478,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// edge manager may be also set via API when using custom edge type
dag.edges.put(edgePlan.getId(),
- new Edge(edgeProperty, dag.getEventHandler()));
+ new Edge(edgeProperty, dag.getEventHandler(), dagConf));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index f9cbede..ddccf8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -104,7 +106,8 @@ public class Edge {
EdgeManagerPlugin edgeManager;
private boolean onDemandRouting = false;
@SuppressWarnings("rawtypes")
- private EventHandler eventHandler;
+ private final EventHandler eventHandler;
+ private final Configuration conf;
private AtomicBoolean bufferEvents = new AtomicBoolean(false);
private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
@@ -116,9 +119,10 @@ public class Edge {
.newConcurrentMap();
@SuppressWarnings("rawtypes")
- public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
+ public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) {
this.edgeProperty = edgeProperty;
this.eventHandler = eventHandler;
+ this.conf = conf;
createEdgeManager();
}
@@ -126,7 +130,11 @@ public class Edge {
switch (edgeProperty.getDataMovementType()) {
case ONE_TO_ONE:
edgeManagerContext = new EdgeManagerPluginContextImpl(null);
- edgeManager = new OneToOneEdgeManager(edgeManagerContext);
+ if (conf.getBoolean(TezConfiguration.TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING, TezConfiguration.TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING_DEFAULT)) {
+ edgeManager = new OneToOneEdgeManagerOnDemand(edgeManagerContext);
+ } else {
+ edgeManager = new OneToOneEdgeManager(edgeManagerContext);
+ }
break;
case BROADCAST:
edgeManagerContext = new EdgeManagerPluginContextImpl(null);
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 6053806..dd38180 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -23,23 +23,18 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-
+import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
-import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import com.google.common.base.Preconditions;
-public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
+public class OneToOneEdgeManager extends EdgeManagerPlugin {
- List<Integer> destinationInputIndices =
+ final List<Integer> destinationInputIndices =
Collections.unmodifiableList(Collections.singletonList(0));
- AtomicBoolean stateChecked = new AtomicBoolean(false);
-
- final EventRouteMetadata commonRouteMeta =
- EventRouteMetadata.create(1, new int[]{0}, new int[]{0});
+ final AtomicBoolean stateChecked = new AtomicBoolean(false);
public OneToOneEdgeManager(EdgeManagerPluginContext context) {
super(context);
@@ -69,37 +64,6 @@ public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
}
@Override
- public void prepareForRouting() throws Exception {
- checkState();
- }
-
- @Override
- public EventRouteMetadata routeDataMovementEventToDestination(
- int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
- throws Exception {
- if (sourceTaskIndex == destinationTaskIndex) {
- return commonRouteMeta;
- }
- return null;
- }
-
- @Override
- public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
- int sourceTaskIndex, int destinationTaskIndex)
- throws Exception {
- if (sourceTaskIndex == destinationTaskIndex) {
- return commonRouteMeta;
- }
- return null;
- }
-
- @Override
- public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
- int sourceTaskIndex, int destinationTaskIndex) throws Exception {
- return commonRouteMeta;
- }
-
- @Override
public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
@@ -112,15 +76,10 @@ public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
}
@Override
- public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
- return destinationTaskIndex;
- }
-
- @Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
return 1;
}
-
+
private void checkState() {
if (stateChecked.get()) {
return;
@@ -133,5 +92,4 @@ public class OneToOneEdgeManager extends EdgeManagerPluginOnDemand {
+ getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
stateChecked.set(true);
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
new file mode 100644
index 0000000..84e7e66
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java
@@ -0,0 +1,137 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import com.google.common.base.Preconditions;
+
+public class OneToOneEdgeManagerOnDemand extends EdgeManagerPluginOnDemand {
+
+ final List<Integer> destinationInputIndices =
+ Collections.unmodifiableList(Collections.singletonList(0));
+ final AtomicBoolean stateChecked = new AtomicBoolean(false);
+
+ final EventRouteMetadata commonRouteMeta =
+ EventRouteMetadata.create(1, new int[]{0}, new int[]{0});
+
+ public OneToOneEdgeManagerOnDemand(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() {
+ // Nothing to do.
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+ return 1;
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
+ return 1;
+ }
+
+ @Override
+ public void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int sourceOutputIndex,
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+ checkState();
+ destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+ }
+
+ @Override
+ public void prepareForRouting() throws Exception {
+ checkState();
+ }
+
+ @Override
+ public EventRouteMetadata routeDataMovementEventToDestination(
+ int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
+ throws Exception {
+ if (sourceTaskIndex == destinationTaskIndex) {
+ return commonRouteMeta;
+ }
+ return null;
+ }
+
+ @Override
+ public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex)
+ throws Exception {
+ if (sourceTaskIndex == destinationTaskIndex) {
+ return commonRouteMeta;
+ }
+ return null;
+ }
+
+ @Override
+ public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, int destinationTaskIndex) throws Exception {
+ return commonRouteMeta;
+ }
+
+ @Override
+ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
+ destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex, int destinationFailedInputIndex) {
+ return destinationTaskIndex;
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(int destinationTaskIndex, int destinationFailedInputIndex) {
+ return destinationTaskIndex;
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return 1;
+ }
+
+ private void checkState() {
+ if (stateChecked.get()) {
+ return;
+ }
+ // by the time routing is initiated all task counts must be determined and stable
+ Preconditions.checkState(getContext().getSourceVertexNumTasks() == getContext()
+ .getDestinationVertexNumTasks(), "1-1 source and destination task counts must match."
+ + " Destination: " + getContext().getDestinationVertexName() + " tasks: "
+ + getContext().getDestinationVertexNumTasks() + " Source: "
+ + getContext().getSourceVertexName() + " tasks: " + getContext().getSourceVertexNumTasks());
+ stateChecked.set(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index 5718b17..eb03d1e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -53,6 +53,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
@@ -104,6 +105,36 @@ public class TestEdge {
.get(0).intValue());
}
+ @Test (timeout = 5000)
+ public void testOneToOneEdgeManagerODR() {
+ EdgeManagerPluginContext mockContext = mock(EdgeManagerPluginContext.class);
+ when(mockContext.getSourceVertexName()).thenReturn("Source");
+ when(mockContext.getDestinationVertexName()).thenReturn("Destination");
+ when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
+ OneToOneEdgeManagerOnDemand manager = new OneToOneEdgeManagerOnDemand(mockContext);
+ manager.initialize();
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices = Maps.newHashMap();
+ DataMovementEvent event = DataMovementEvent.create(1, null);
+
+ // fail when source and destination are inconsistent
+ when(mockContext.getDestinationVertexNumTasks()).thenReturn(4);
+ try {
+ manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("1-1 source and destination task counts must match"));
+ }
+
+ // now make it consistent
+ when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
+ manager.routeDataMovementEventToDestination(event, 1, 1, destinationTaskAndInputIndices);
+ Assert.assertEquals(1, destinationTaskAndInputIndices.size());
+ Assert.assertEquals(1, destinationTaskAndInputIndices.entrySet().iterator().next().getKey()
+ .intValue());
+ Assert.assertEquals(0, destinationTaskAndInputIndices.entrySet().iterator().next().getValue()
+ .get(0).intValue());
+ }
+
@Test(timeout = 5000)
public void testScatterGatherManager() {
EdgeManagerPluginContext mockContext = mock(EdgeManagerPluginContext.class);
@@ -132,7 +163,7 @@ public class TestEdge {
EdgeProperty edgeProp = EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, mock(OutputDescriptor.class),
mock(InputDescriptor.class));
- Edge edge = new Edge(edgeProp, eventHandler);
+ Edge edge = new Edge(edgeProp, eventHandler, new TezConfiguration());
TezVertexID srcVertexID = createVertexID(1);
TezVertexID destVertexID = createVertexID(2);
@@ -246,7 +277,7 @@ public class TestEdge {
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create(""),
- InputDescriptor.create("")), mockEventHandler);
+ InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
TezVertexID v1Id = createVertexID(1);
TezVertexID v2Id = createVertexID(2);
edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
@@ -270,7 +301,7 @@ public class TestEdge {
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create(""),
- InputDescriptor.create("")), mockEventHandler);
+ InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
TezVertexID v1Id = createVertexID(1);
TezVertexID v2Id = createVertexID(2);
edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
@@ -294,7 +325,7 @@ public class TestEdge {
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create(""),
- InputDescriptor.create("")), mockEventHandler);
+ InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
TezVertexID v1Id = createVertexID(1);
TezVertexID v2Id = createVertexID(2);
edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
@@ -321,7 +352,7 @@ public class TestEdge {
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create(""),
- InputDescriptor.create("")), mockEventHandler);
+ InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
TezVertexID v1Id = createVertexID(1);
TezVertexID v2Id = createVertexID(2);
edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e569949..0176b79 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2217,7 +2217,7 @@ public class TestVertexImpl {
for (EdgePlan edgePlan : dagPlan.getEdgeList()) {
EdgeProperty edgeProperty = DagTypeConverters
.createEdgePropertyMapFromDAGPlan(edgePlan);
- edges.put(edgePlan.getId(), new Edge(edgeProperty, dispatcher.getEventHandler()));
+ edges.put(edgePlan.getId(), new Edge(edgeProperty, dispatcher.getEventHandler(), conf));
}
parseVertexEdges();
http://git-wip-us.apache.org/repos/asf/tez/blob/20c71fd4/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 7ba6028..49bb9f5 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManagerOnDemand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -64,7 +65,6 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
@@ -154,6 +154,7 @@ public class TestExceptionPropagation {
tezConf
.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING, true);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
@@ -753,7 +754,7 @@ public class TestExceptionPropagation {
}
// EdgeManager for edge linking vertex1 and vertex2
- public static class CustomEdgeManager extends OneToOneEdgeManager {
+ public static class CustomEdgeManager extends OneToOneEdgeManagerOnDemand {
private ExceptionLocation exLocation;
@@ -822,7 +823,7 @@ public class TestExceptionPropagation {
}
super.prepareForRouting();
}
-
+
@Override
public EventRouteMetadata routeDataMovementEventToDestination(
int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) throws Exception {