You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/04/04 23:58:45 UTC
git commit: TEZ-1015. Dag failed with Invalid event: V_ROUTE_EVENT at
RECOVERING. (hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master a6cd1820c -> c76ef84b8
TEZ-1015. Dag failed with Invalid event: V_ROUTE_EVENT at RECOVERING. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c76ef84b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c76ef84b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c76ef84b
Branch: refs/heads/master
Commit: c76ef84b825a38df01082df86dbe8642ecfd6269
Parents: a6cd182
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Apr 4 14:56:54 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Apr 4 14:56:54 2014 -0700
----------------------------------------------------------------------
.../event/VertexEventSourceVertexRecovered.java | 9 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 118 ++++++++++++++++---
2 files changed, 108 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c76ef84b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
index 9481849..e3b9334 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
@@ -29,15 +29,18 @@ public class VertexEventSourceVertexRecovered extends VertexEvent {
VertexState sourceVertexState;
TezVertexID sourceVertexID;
List<TezTaskAttemptID> completedTaskAttempts;
+ int sourceDistanceFromRoot;
public VertexEventSourceVertexRecovered(TezVertexID vertexID,
TezVertexID sourceVertexID,
VertexState sourceVertexState,
- List<TezTaskAttemptID> completedTaskAttempts) {
+ List<TezTaskAttemptID> completedTaskAttempts,
+ int sourceDistanceFromRoot) {
super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED);
this.sourceVertexState = sourceVertexState;
this.sourceVertexID = sourceVertexID;
this.completedTaskAttempts = completedTaskAttempts;
+ this.sourceDistanceFromRoot = sourceDistanceFromRoot;
}
public VertexState getSourceVertexState() {
@@ -52,4 +55,8 @@ public class VertexEventSourceVertexRecovered extends VertexEvent {
return completedTaskAttempts;
}
+ public int getSourceDistanceFromRoot() {
+ return sourceDistanceFromRoot;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c76ef84b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 270c5ac..8e1f04b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -227,10 +228,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Transitions from NEW state
.addTransition
(VertexState.NEW,
- EnumSet.of(VertexState.NEW, VertexState.INITED,
- VertexState.INITIALIZING, VertexState.FAILED),
- VertexEventType.V_INIT,
- new InitTransition())
+ EnumSet.of(VertexState.NEW, VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.FAILED),
+ VertexEventType.V_INIT,
+ new InitTransition())
.addTransition
(VertexState.NEW,
EnumSet.of(VertexState.NEW, VertexState.INITED,
@@ -250,6 +251,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_SOURCE_VERTEX_RECOVERED,
new RecoverTransition())
.addTransition
+ (VertexState.RECOVERING, VertexState.RECOVERING,
+ EnumSet.of(VertexEventType.V_INIT,
+ VertexEventType.V_ROUTE_EVENT,
+ VertexEventType.V_SOURCE_VERTEX_STARTED,
+ VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED),
+ new BufferDataRecoverTransition())
+ .addTransition
+ (VertexState.RECOVERING, VertexState.RECOVERING,
+ VertexEventType.V_TERMINATE,
+ new TerminateDuringRecoverTransition())
+ .addTransition
(VertexState.NEW,
EnumSet.of(VertexState.INITED,
VertexState.INITIALIZING, VertexState.RUNNING,
@@ -430,7 +442,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
- VertexEventType.V_ROOT_INPUT_FAILED))
+ VertexEventType.V_ROOT_INPUT_FAILED,
+ VertexEventType.V_SOURCE_VERTEX_RECOVERED))
// Transitions from KILLED state
.addTransition(
@@ -450,7 +463,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
- VertexEventType.V_ROOT_INPUT_FAILED))
+ VertexEventType.V_ROOT_INPUT_FAILED,
+ VertexEventType.V_SOURCE_VERTEX_RECOVERED))
// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
@@ -468,7 +482,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_INTERNAL_ERROR,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
- VertexEventType.V_ROOT_INPUT_FAILED))
+ VertexEventType.V_ROOT_INPUT_FAILED,
+ VertexEventType.V_SOURCE_VERTEX_RECOVERED))
// create the topology tables
.installTopology();
@@ -510,7 +525,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private boolean tasksNotYetScheduled = true;
// We may always store task events in the vertex for scalability
List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
- List<TezEvent> pendingRouteEventsWhileIniting = null;
+ List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
private RootInputInitializerRunner rootInputInitializer;
@@ -1952,7 +1967,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
entry.getKey().getVertexId(),
- vertex.vertexId, endState, null));
+ vertex.vertexId, endState, null,
+ vertex.getDistanceFromRoot()));
}
}
if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
@@ -2015,6 +2031,51 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ public static class TerminateDuringRecoverTransition implements
+ SingleArcTransition<VertexImpl, VertexEvent> {
+
+ @Override
+ public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
+ LOG.info("Received a terminate during recovering, setting recovered"
+ + " state to KILLED");
+ vertex.recoveredState = VertexState.KILLED;
+ }
+
+ }
+
+ public static class BufferDataRecoverTransition implements
+ SingleArcTransition<VertexImpl, VertexEvent> {
+
+ @Override
+ public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
+ LOG.info("Received upstream event while still recovering"
+ + ", vertexId=" + vertex.logIdentifier
+ + ", vertexEventType=" + vertexEvent.getType());
+ if (vertexEvent.getType().equals(VertexEventType.V_ROUTE_EVENT)) {
+ VertexEventRouteEvent evt = (VertexEventRouteEvent) vertexEvent;
+ vertex.pendingRouteEvents.addAll(evt.getEvents());
+ } else if (vertexEvent.getType().equals(
+ VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) {
+ VertexEventSourceTaskAttemptCompleted evt =
+ (VertexEventSourceTaskAttemptCompleted) vertexEvent;
+ vertex.pendingReportedSrcCompletions.add(
+ evt.getCompletionEvent().getTaskAttemptId());
+ } else if (vertexEvent.getType().equals(
+ VertexEventType.V_SOURCE_VERTEX_STARTED)) {
+ VertexEventSourceVertexStarted startEvent =
+ (VertexEventSourceVertexStarted) vertexEvent;
+ int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
+ if(vertex.distanceFromRoot < distanceFromRoot) {
+ vertex.distanceFromRoot = distanceFromRoot;
+ }
+ ++vertex.numStartedSourceVertices;
+ } else if (vertexEvent.getType().equals(VertexEventType.V_INIT)) {
+ ++vertex.numInitedSourceVertices;
+ }
+ }
+ }
+
+
public static class RecoverTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -2022,6 +2083,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
VertexEventSourceVertexRecovered sourceRecoveredEvent =
(VertexEventSourceVertexRecovered) vertexEvent;
+ // Use distance from root from Recovery events as upstream vertices may not
+ // send source vertex started event that is used to compute distance
+ int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() + 1;
+ if(vertex.distanceFromRoot < distanceFromRoot) {
+ vertex.distanceFromRoot = distanceFromRoot;
+ }
+
++vertex.numRecoveredSourceVertices;
switch (sourceRecoveredEvent.getSourceVertexState()) {
@@ -2224,13 +2292,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
entry.getKey().getVertexId(),
- vertex.vertexId, endState, completedTaskAttempts));
+ vertex.vertexId, endState, completedTaskAttempts,
+ vertex.getDistanceFromRoot()));
}
if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
.contains(endState)) {
// Send events downstream
vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
vertex.recoveredEvents.clear();
+ if (!vertex.pendingRouteEvents.isEmpty()) {
+ VertexImpl.ROUTE_EVENT_TRANSITION.transition(vertex,
+ new VertexEventRouteEvent(vertex.getVertexId(),
+ vertex.pendingRouteEvents));
+ vertex.pendingRouteEvents.clear();
+ }
} else {
// Ensure no recovered events
if (!vertex.recoveredEvents.isEmpty()) {
@@ -2272,8 +2347,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public VertexState transition(VertexImpl vertex, VertexEvent event) {
VertexState vertexState = VertexState.NEW;
vertex.numInitedSourceVertices++;
+ // TODO fix this as part of TEZ-1008
+ // Should have a different way to infer source vertices INITED
+ // as compared to a recovery triggered INIT
+ // In normal flow, upstream vertices send a V_INIT downstream to
+ // trigger an init of the downstream vertex. In case of recovery,
+ // upstream vertices may not send this event if they are already in a
+ // RUNNING or completed state. Hence, recovering vertices may send
+ // themselves a V_INIT to trigger a transition. Hence, the count may
+ // go one over.
if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
- vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+ (vertex.numInitedSourceVertices == vertex.sourceVertices.size()
+ || vertex.numInitedSourceVertices == (vertex.sourceVertices.size()+1))) {
vertexState = handleInitEvent(vertex, event);
if (vertexState != VertexState.FAILED) {
if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
@@ -2378,10 +2463,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
// Vertex will be moving to INITED state, safe to process pending route events.
- if (pendingRouteEventsWhileIniting != null) {
+ if (!pendingRouteEvents.isEmpty()) {
VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
- new VertexEventRouteEvent(getVertexId(), pendingRouteEventsWhileIniting));
- pendingRouteEventsWhileIniting = null;
+ new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
+ pendingRouteEvents.clear();
}
return vertexState;
}
@@ -2863,12 +2948,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
VertexEventRouteEvent re = (VertexEventRouteEvent) event;
- if (vertex.pendingRouteEventsWhileIniting == null) {
- vertex.pendingRouteEventsWhileIniting = Lists.newLinkedList();
- }
// Store the events for post-init routing, since INIT state is when
// initial task parallelism will be set
- vertex.pendingRouteEventsWhileIniting.addAll(re.getEvents());
+ vertex.pendingRouteEvents.addAll(re.getEvents());
}
}