You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/05/09 03:30:15 UTC

git commit: TEZ-108. DAG should track numCompleted and numSuccessful vertices separately.

Updated Branches:
  refs/heads/TEZ-1 df1038e1a -> 6f27ac079


TEZ-108. DAG should track numCompleted and numSuccessful vertices separately.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6f27ac07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6f27ac07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6f27ac07

Branch: refs/heads/TEZ-1
Commit: 6f27ac079065334c8a72ff63fb5a81bb41d794ce
Parents: df1038e
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed May 8 16:33:32 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed May 8 18:27:31 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/app/dag/DAG.java  |    2 +-
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |  114 ++++++---------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |    2 +-
 3 files changed, 49 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6f27ac07/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index a3fb3b7..03bf570 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -52,7 +52,7 @@ public interface DAG {
   Vertex getVertex(TezVertexID vertexId);
   List<String> getDiagnostics();
   int getTotalVertices();
-  int getCompletedVertices();
+  int getSuccessfulVertices();
   float getProgress();
   boolean isUber();
   String getUserName();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6f27ac07/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c37a9e9..0e1206f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -85,7 +85,6 @@ import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.records.TezDAGID;
-import org.apache.tez.engine.records.TezTaskAttemptID;
 import org.apache.tez.engine.records.TezVertexID;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
@@ -117,12 +116,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   
   private DAGScheduler dagScheduler;
 
-  /**
-   * maps nodes to tasks that have run on those nodes
-   */
-  private final HashMap<NodeId, List<TezTaskAttemptID>>
-    nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TezTaskAttemptID>>();
-
   private final EventHandler eventHandler;
   // TODO Metrics
   //private final MRAppMetrics metrics;
@@ -232,7 +225,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               (DAGState.KILL_WAIT,
               EnumSet.of(DAGState.KILL_WAIT, DAGState.KILLED),
               DAGEventType.DAG_VERTEX_COMPLETED,
-              new KillWaitTaskCompletedTransition())
+              new VertexCompletedTransition())
           .addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
               DAGEventType.DAG_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
@@ -307,6 +300,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   //changing fields while the job is running
   private int numVertices;
   private int numCompletedVertices = 0;
+  private int numSuccessfulVertices = 0;
   private int numFailedVertices = 0;
   private int numKilledVertices = 0;
   private boolean isUber = false;
@@ -619,22 +613,47 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return FileSystem.get(conf);
   }
 
-  static DAGState checkJobCompleteSuccess(DAGImpl dag) {
-    // check for dag success
+  static DAGState checkJobForCompletion(DAGImpl dag) {
+
     LOG.info("ZZZZ: Checking dag completion"
         + ", numCompletedVertices=" + dag.numCompletedVertices
+        + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
         + ", numFailedVertices=" + dag.numFailedVertices
         + ", numKilledVertices=" + dag.numKilledVertices
         + ", numVertices=" + dag.numVertices);
 
-    if (dag.numCompletedVertices == dag.vertices.size()) {
-      // TODO: Maybe set cleanup progress. Otherwise dag progress will
-      // always stay at 0.95 when reported from an AM.
-      // TODO DAG committer
+    if (dag.numFailedVertices > 0) {
+      dag.setFinishTime();
+      String diagnosticMsg = "DAG failed as vertices failed. " +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      dag.abortJob(DAGStatus.State.FAILED);
+      return dag.finished(DAGState.FAILED);
+    }
+
+    if(dag.numSuccessfulVertices == dag.numVertices) {
+      dag.setFinishTime();
       dag.logJobHistoryFinishedEvent();
       return dag.finished(DAGState.SUCCEEDED);
     }
-    return null;
+
+    if (dag.numCompletedVertices == dag.numVertices) {
+      // this means the dag has some killed vertices
+      String diagnosticMsg = "DAG killed. " +
+          " failedVertices:" + dag.numFailedVertices +
+          " killedVertices:" + dag.numKilledVertices;
+      LOG.info(diagnosticMsg);
+      dag.addDiagnostic(diagnosticMsg);
+      assert dag.numKilledVertices > 0;
+      dag.setFinishTime();
+      dag.abortJob(DAGStatus.State.KILLED);
+      return dag.finished(DAGState.KILLED);
+    }
+
+    //return the current state, Job not finished yet
+    return dag.getInternalState();
   }
 
   DAGState finished(DAGState finalState) {
@@ -690,10 +709,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
-  public int getCompletedVertices() {
+  public int getSuccessfulVertices() {
     readLock.lock();
     try {
-      return numCompletedVertices;
+      return numSuccessfulVertices;
     } finally {
       readLock.unlock();
     }
@@ -1023,6 +1042,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", vertexState=" + vertexEvent.getVertexState());
 
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
+      job.numCompletedVertices++;
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
         vertexSucceeded(job, vertex);
       } else if (vertexEvent.getVertexState() == VertexState.FAILED) {
@@ -1031,40 +1051,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         vertexKilled(job, vertex);
       }
 
-      LOG.info("ZZZZ: Num completed vertices: " + job.numCompletedVertices
-          + ", Num failed vertices: " + job.numFailedVertices
-          + ", Num killed vertices: " + job.numKilledVertices);
+      LOG.info("ZZZZ: Vertex completed."
+          + ", numCompletedVertices=" + job.numCompletedVertices
+          + ", numSuccessfulVertices=" + job.numSuccessfulVertices
+          + ", numFailedVertices=" + job.numFailedVertices
+          + ", numKilledVertices=" + job.numKilledVertices
+          + ", numVertices=" + job.numVertices);
 
       job.dagScheduler.vertexCompleted(vertex);
 
       return checkJobForCompletion(job);
     }
 
-    protected DAGState checkJobForCompletion(DAGImpl job) {
-      //check for Job failure
-      if (job.numFailedVertices > 0) {
-        job.setFinishTime();
-
-        String diagnosticMsg = "Job failed as vertices failed. " +
-            " failedVertices:" + job.numFailedVertices +
-            " killedVertices:" + job.numKilledVertices;
-        LOG.info(diagnosticMsg);
-        job.addDiagnostic(diagnosticMsg);
-        job.abortJob(DAGStatus.State.FAILED);
-        return job.finished(DAGState.FAILED);
-      }
-
-      DAGState jobCompleteSuccess = DAGImpl.checkJobCompleteSuccess(job);
-      if (jobCompleteSuccess != null) {
-        return jobCompleteSuccess;
-      }
-
-      //return the current state, Job not finished yet
-      return job.getInternalState();
-    }
-
     private void vertexSucceeded(DAGImpl job, Vertex vertex) {
-      job.numCompletedVertices++;
+      job.numSuccessfulVertices++;
       // TODO: Metrics
       //job.metrics.completedTask(task);
     }
@@ -1084,33 +1084,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
-  // Transition class for handling jobs with no tasks
+  // Transition class for handling jobs with no vertices
   static class JobNoTasksCompletedTransition implements
   MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
     @Override
-    public DAGState transition(DAGImpl job, DAGEvent event) {
-      DAGState jobCompleteSuccess = DAGImpl.checkJobCompleteSuccess(job);
-      if (jobCompleteSuccess != null) {
-        return jobCompleteSuccess;
-      }
-
-      // Return the current state, Job not finished yet
-      return job.getInternalState();
-    }
-  }
-
-  private static class KillWaitTaskCompletedTransition extends
-      VertexCompletedTransition {
-    @Override
-    protected DAGState checkJobForCompletion(DAGImpl job) {
-      if (job.numCompletedVertices == job.vertices.size()) {
-        job.setFinishTime();
-        job.abortJob(DAGStatus.State.KILLED);
-        return job.finished(DAGState.KILLED);
-      }
-      //return the current state, Job not finished yet
-      return job.getInternalState();
+    public DAGState transition(DAGImpl dag, DAGEvent event) {
+      return checkJobForCompletion(dag);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6f27ac07/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index dd621fe..f74b172 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1078,7 +1078,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      vertex.completedTaskCount++;// TEZ-39 this is a bug
+      vertex.completedTaskCount++;
       LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
       Task task = vertex.tasks.get(taskEvent.getTaskID());