You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/04/04 23:58:45 UTC

git commit: TEZ-1015. Dag failed with Invalid event: V_ROUTE_EVENT at RECOVERING. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master a6cd1820c -> c76ef84b8


TEZ-1015. Dag failed with Invalid event: V_ROUTE_EVENT at RECOVERING. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c76ef84b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c76ef84b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c76ef84b

Branch: refs/heads/master
Commit: c76ef84b825a38df01082df86dbe8642ecfd6269
Parents: a6cd182
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Apr 4 14:56:54 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Apr 4 14:56:54 2014 -0700

----------------------------------------------------------------------
 .../event/VertexEventSourceVertexRecovered.java |   9 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 118 ++++++++++++++++---
 2 files changed, 108 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c76ef84b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
index 9481849..e3b9334 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
@@ -29,15 +29,18 @@ public class VertexEventSourceVertexRecovered extends VertexEvent {
   VertexState sourceVertexState;
   TezVertexID sourceVertexID;
   List<TezTaskAttemptID> completedTaskAttempts;
+  int sourceDistanceFromRoot;
 
   public VertexEventSourceVertexRecovered(TezVertexID vertexID,
       TezVertexID sourceVertexID,
       VertexState sourceVertexState,
-      List<TezTaskAttemptID> completedTaskAttempts) {
+      List<TezTaskAttemptID> completedTaskAttempts,
+      int sourceDistanceFromRoot) {
     super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED);
     this.sourceVertexState = sourceVertexState;
     this.sourceVertexID = sourceVertexID;
     this.completedTaskAttempts = completedTaskAttempts;
+    this.sourceDistanceFromRoot = sourceDistanceFromRoot;
   }
 
   public VertexState getSourceVertexState() {
@@ -52,4 +55,8 @@ public class VertexEventSourceVertexRecovered extends VertexEvent {
     return completedTaskAttempts;
   }
 
+  public int getSourceDistanceFromRoot() {
+    return sourceDistanceFromRoot;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c76ef84b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 270c5ac..8e1f04b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -227,10 +228,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Transitions from NEW state
           .addTransition
               (VertexState.NEW,
-              EnumSet.of(VertexState.NEW, VertexState.INITED, 
-                  VertexState.INITIALIZING, VertexState.FAILED),
-              VertexEventType.V_INIT,
-              new InitTransition())
+                  EnumSet.of(VertexState.NEW, VertexState.INITED,
+                      VertexState.INITIALIZING, VertexState.FAILED),
+                  VertexEventType.V_INIT,
+                  new InitTransition())
           .addTransition
               (VertexState.NEW,
                   EnumSet.of(VertexState.NEW, VertexState.INITED,
@@ -250,6 +251,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_SOURCE_VERTEX_RECOVERED,
                   new RecoverTransition())
           .addTransition
+              (VertexState.RECOVERING, VertexState.RECOVERING,
+                  EnumSet.of(VertexEventType.V_INIT,
+                      VertexEventType.V_ROUTE_EVENT,
+                      VertexEventType.V_SOURCE_VERTEX_STARTED,
+                      VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED),
+                  new BufferDataRecoverTransition())
+          .addTransition
+              (VertexState.RECOVERING, VertexState.RECOVERING,
+                  VertexEventType.V_TERMINATE,
+                  new TerminateDuringRecoverTransition())
+          .addTransition
               (VertexState.NEW,
                   EnumSet.of(VertexState.INITED,
                       VertexState.INITIALIZING, VertexState.RUNNING,
@@ -430,7 +442,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
-                  VertexEventType.V_ROOT_INPUT_FAILED))
+                  VertexEventType.V_ROOT_INPUT_FAILED,
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED))
 
           // Transitions from KILLED state
           .addTransition(
@@ -450,7 +463,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
-                  VertexEventType.V_ROOT_INPUT_FAILED))
+                  VertexEventType.V_ROOT_INPUT_FAILED,
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
@@ -468,7 +482,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_INTERNAL_ERROR,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
-                  VertexEventType.V_ROOT_INPUT_FAILED))
+                  VertexEventType.V_ROOT_INPUT_FAILED,
+                  VertexEventType.V_SOURCE_VERTEX_RECOVERED))
           // create the topology tables
           .installTopology();
 
@@ -510,7 +525,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private boolean tasksNotYetScheduled = true;
   // We may always store task events in the vertex for scalability
   List<TezEvent> pendingTaskEvents = Lists.newLinkedList();
-  List<TezEvent> pendingRouteEventsWhileIniting = null;
+  List<TezEvent> pendingRouteEvents = new LinkedList<TezEvent>();
   List<TezTaskAttemptID> pendingReportedSrcCompletions = Lists.newLinkedList();
 
   private RootInputInitializerRunner rootInputInitializer;
@@ -1952,7 +1967,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
           vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
               entry.getKey().getVertexId(),
-              vertex.vertexId, endState, null));
+              vertex.vertexId, endState, null,
+              vertex.getDistanceFromRoot()));
         }
       }
       if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
@@ -2015,6 +2031,51 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  public static class TerminateDuringRecoverTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
+      LOG.info("Received a terminate during recovering, setting recovered"
+          + " state to KILLED");
+      vertex.recoveredState = VertexState.KILLED;
+    }
+
+  }
+
+  public static class BufferDataRecoverTransition implements
+      SingleArcTransition<VertexImpl, VertexEvent> {
+
+    @Override
+    public void transition(VertexImpl vertex, VertexEvent vertexEvent) {
+      LOG.info("Received upstream event while still recovering"
+          + ", vertexId=" + vertex.logIdentifier
+          + ", vertexEventType=" + vertexEvent.getType());
+      if (vertexEvent.getType().equals(VertexEventType.V_ROUTE_EVENT)) {
+        VertexEventRouteEvent evt = (VertexEventRouteEvent) vertexEvent;
+        vertex.pendingRouteEvents.addAll(evt.getEvents());
+      } else if (vertexEvent.getType().equals(
+          VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED)) {
+        VertexEventSourceTaskAttemptCompleted evt =
+            (VertexEventSourceTaskAttemptCompleted) vertexEvent;
+        vertex.pendingReportedSrcCompletions.add(
+            evt.getCompletionEvent().getTaskAttemptId());
+      } else if (vertexEvent.getType().equals(
+          VertexEventType.V_SOURCE_VERTEX_STARTED)) {
+        VertexEventSourceVertexStarted startEvent =
+            (VertexEventSourceVertexStarted) vertexEvent;
+        int distanceFromRoot = startEvent.getSourceDistanceFromRoot() + 1;
+        if(vertex.distanceFromRoot < distanceFromRoot) {
+          vertex.distanceFromRoot = distanceFromRoot;
+        }
+        ++vertex.numStartedSourceVertices;
+      } else if (vertexEvent.getType().equals(VertexEventType.V_INIT)) {
+        ++vertex.numInitedSourceVertices;
+      }
+    }
+  }
+
+
   public static class RecoverTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -2022,6 +2083,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
       VertexEventSourceVertexRecovered sourceRecoveredEvent =
           (VertexEventSourceVertexRecovered) vertexEvent;
+      // Use distance from root from Recovery events as upstream vertices may not
+      // send source vertex started event that is used to compute distance
+      int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() + 1;
+      if(vertex.distanceFromRoot < distanceFromRoot) {
+        vertex.distanceFromRoot = distanceFromRoot;
+      }
+
       ++vertex.numRecoveredSourceVertices;
 
       switch (sourceRecoveredEvent.getSourceVertexState()) {
@@ -2224,13 +2292,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
         vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
             entry.getKey().getVertexId(),
-            vertex.vertexId, endState, completedTaskAttempts));
+            vertex.vertexId, endState, completedTaskAttempts,
+            vertex.getDistanceFromRoot()));
       }
       if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
           .contains(endState)) {
         // Send events downstream
         vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
         vertex.recoveredEvents.clear();
+        if (!vertex.pendingRouteEvents.isEmpty()) {
+          VertexImpl.ROUTE_EVENT_TRANSITION.transition(vertex,
+              new VertexEventRouteEvent(vertex.getVertexId(),
+                  vertex.pendingRouteEvents));
+          vertex.pendingRouteEvents.clear();
+        }
       } else {
         // Ensure no recovered events
         if (!vertex.recoveredEvents.isEmpty()) {
@@ -2272,8 +2347,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       VertexState vertexState = VertexState.NEW;
       vertex.numInitedSourceVertices++;
+      // TODO fix this as part of TEZ-1008
+      // Should have a different way to infer source vertices INITED
+      // as compared to a recovery triggered INIT
+      // In normal flow, upstream vertices send a V_INIT downstream to
+      // trigger an init of the downstream vertex. In case of recovery,
+      // upstream vertices may not send this event if they are already in a
+      // RUNNING or completed state. Hence, recovering vertices may send
+      // themselves a V_INIT to trigger a transition. Hence, the count may
+      // go one over.
       if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
-          vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+          (vertex.numInitedSourceVertices == vertex.sourceVertices.size()
+            || vertex.numInitedSourceVertices == (vertex.sourceVertices.size()+1))) {
         vertexState = handleInitEvent(vertex, event);
         if (vertexState != VertexState.FAILED) {
           if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
@@ -2378,10 +2463,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
 
     // Vertex will be moving to INITED state, safe to process pending route events.
-    if (pendingRouteEventsWhileIniting != null) {
+    if (!pendingRouteEvents.isEmpty()) {
       VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
-          new VertexEventRouteEvent(getVertexId(), pendingRouteEventsWhileIniting));
-      pendingRouteEventsWhileIniting = null;
+          new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
+      pendingRouteEvents.clear();
     }
     return vertexState;
   }
@@ -2863,12 +2948,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       VertexEventRouteEvent re = (VertexEventRouteEvent) event;
-      if (vertex.pendingRouteEventsWhileIniting == null) {
-        vertex.pendingRouteEventsWhileIniting = Lists.newLinkedList();
-      }
       // Store the events for post-init routing, since INIT state is when
       // initial task parallelism will be set
-      vertex.pendingRouteEventsWhileIniting.addAll(re.getEvents());
+      vertex.pendingRouteEvents.addAll(re.getEvents());
     }
   }