You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/04/30 03:43:22 UTC

svn commit: r1477447 - /incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java

Author: bikas
Date: Tue Apr 30 01:43:21 2013
New Revision: 1477447

URL: http://svn.apache.org/r1477447
Log:
TEZ.39 Fix VertexImpl.checkVertexCompleteSuccess (bikas)

Modified:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1477447&r1=1477446&r2=1477447&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Tue Apr 30 01:43:21 2013
@@ -87,7 +87,6 @@ import org.apache.tez.dag.app.dag.event.
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.records.TezDAGID;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 import org.apache.tez.engine.records.TezTaskAttemptID;
@@ -231,10 +230,10 @@ public class VertexImpl implements org.a
               (VertexState.KILL_WAIT,
               EnumSet.of(VertexState.KILL_WAIT, VertexState.KILLED),
               VertexEventType.V_TASK_COMPLETED,
-              new KillWaitTaskCompletedTransition())
+              new TaskCompletedTransition())
           .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
               VertexEventType.V_TASK_ATTEMPT_COMPLETED,
-              TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+              TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) // TODO shouldnt be done for KILL_WAIT vertex
           .addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
               VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
@@ -673,39 +672,49 @@ public class VertexImpl implements org.a
     return FileSystem.get(conf);
   }
 
-  static VertexState checkVertexCompleteSuccess(VertexImpl vertex) {
-    // TODO TEZ-39 this vertex is definitely buggy as completed includes
-    // killed/failed check for vertex success
-    if (vertex.completedTaskCount == vertex.tasks.size()) {
-      if (vertex.failedTaskCount > 0) {
-        try {
-          vertex.committer.abortVertex(VertexStatus.State.FAILED);
-        } catch (IOException e) {
-          LOG.error("Failed to do abort on vertex, name=" + vertex.getName(),
-              e);
+  static VertexState checkVertexForCompletion(VertexImpl vertex) {
+    //check for vertex failure first
+    if (vertex.failedTaskCount > 1) {
+      vertex.setFinishTime();
+
+      String diagnosticMsg = "Vertex failed as tasks failed. "
+          + "failedTasks:"
+          + vertex.failedTaskCount;
+      LOG.info(diagnosticMsg);
+      vertex.addDiagnostic(diagnosticMsg);
+      vertex.abortVertex(VertexStatus.State.FAILED);
+      vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+          .getVertexId(), VertexState.FAILED));
+      return vertex.finished(VertexState.FAILED);
+    }
+    
+    if(vertex.succeededTaskCount == vertex.tasks.size()) {
+      try {
+        if (!vertex.committed.getAndSet(true)) {
+          // commit only once
+          vertex.committer.commitVertex();
         }
+      } catch (IOException e) {
+        LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
         vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
             .getVertexId(), VertexState.FAILED));
         return vertex.finished(VertexState.FAILED);
-      } else {
-        try {
-          if (!vertex.committed.getAndSet(true)) {
-            // commit only once
-            vertex.committer.commitVertex();
-          }
-        } catch (IOException e) {
-          LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
-          vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
-              .getVertexId(), VertexState.FAILED));
-          return vertex.finished(VertexState.FAILED);
-        }
-        vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
-            .getVertexId(), vertex.getState()));
-        return vertex.finished(VertexState.SUCCEEDED);
       }
+      vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+          .getVertexId(), vertex.getState()));
+      return vertex.finished(VertexState.SUCCEEDED);      
     }
-    // TODO: what if one of the tasks failed?
-    return null;
+    
+    if (vertex.completedTaskCount == vertex.tasks.size()) {
+      // this means the vertex has some killed tasks
+      assert vertex.killedTaskCount > 0;
+      vertex.setFinishTime();
+      vertex.abortVertex(VertexStatus.State.KILLED);
+      return vertex.finished(VertexState.KILLED);
+    }
+    
+    //return the current state, Vertex not finished yet
+    return vertex.getInternalState();    
   }
 
   VertexState finished(VertexState finalState) {
@@ -1033,6 +1042,7 @@ public class VertexImpl implements org.a
     }
   }
 
+  // TODO Why is TA event coming directly to Vertex instead of TA -> Task -> Vertex
   private static class TaskAttemptCompletedEventTransition implements
       SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
@@ -1107,39 +1117,13 @@ public class VertexImpl implements org.a
       }
 
       vertex.vertexScheduler.onVertexCompleted();
-      VertexState state = checkVertexForCompletion(vertex);
+      VertexState state = VertexImpl.checkVertexForCompletion(vertex);
       if(state == VertexState.SUCCEEDED) {
         vertex.vertexScheduler.onVertexCompleted();
       }
       return state;
     }
 
-    protected VertexState checkVertexForCompletion(VertexImpl vertex) {
-      //check for vertex failure
-      if (vertex.failedTaskCount > 1) {
-        vertex.setFinishTime();
-
-        String diagnosticMsg = "Vertex failed as tasks failed. "
-            + "failedTasks:"
-            + vertex.failedTaskCount;
-        LOG.info(diagnosticMsg);
-        vertex.addDiagnostic(diagnosticMsg);
-        vertex.abortVertex(VertexStatus.State.FAILED);
-        vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
-            .getVertexId(), VertexState.FAILED));
-        return vertex.finished(VertexState.FAILED);
-      }
-
-      VertexState vertexCompleteSuccess =
-          VertexImpl.checkVertexCompleteSuccess(vertex);
-      if (vertexCompleteSuccess != null) {
-        return vertexCompleteSuccess;
-      }
-
-      //return the current state, Vertex not finished yet
-      return vertex.getInternalState();
-    }
-
     private void taskSucceeded(VertexImpl vertex, Task task) {
       vertex.succeededTaskCount++;
       // TODO Metrics
@@ -1168,7 +1152,7 @@ public class VertexImpl implements org.a
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       VertexState vertexCompleteSuccess =
-          VertexImpl.checkVertexCompleteSuccess(vertex);
+          VertexImpl.checkVertexForCompletion(vertex);
       if (vertexCompleteSuccess != null) {
         return vertexCompleteSuccess;
       }
@@ -1188,20 +1172,6 @@ public class VertexImpl implements org.a
     }
   }
 
-  private static class KillWaitTaskCompletedTransition extends
-      TaskCompletedTransition {
-    @Override
-    protected VertexState checkVertexForCompletion(VertexImpl vertex) {
-      if (vertex.completedTaskCount == vertex.tasks.size()) {
-        vertex.setFinishTime();
-        vertex.abortVertex(VertexStatus.State.KILLED);
-        return vertex.finished(VertexState.KILLED);
-      }
-      //return the current state, Job not finished yet
-      return vertex.getInternalState();
-    }
-  }
-
   private void addDiagnostic(String diag) {
     diagnostics.add(diag);
   }