You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:54 UTC

[16/43] tez git commit: TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting (zjffdu)

TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6e6ad706
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6e6ad706
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6e6ad706

Branch: refs/heads/TEZ-2003
Commit: 6e6ad706f5b6611058541c3bf072343bf002ced5
Parents: 4a6808c
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri May 8 19:55:53 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri May 8 19:55:53 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 20 ++++++++++++++++----
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  3 +--
 .../apache/tez/dag/app/dag/impl/TestCommit.java | 14 ++++++++++++--
 4 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3520768..185e1b0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
     Default max limit increased. Should not affect existing users.
 
 ALL CHANGES:
+  TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting
   TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly
   TEZ-776. Reduce AM mem usage caused by storing TezEvents
   TEZ-2423. Tez UI: Remove Attempt Index column from task->attempts page

http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 1726c18..0a87241 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1206,6 +1206,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           return dag.finished(DAGState.SUCCEEDED);
         }
       } else {
+        // check commits before move to COMPLETED state.
         if (dag.commitFutures.isEmpty()) {
           return finishWithTerminationCause(dag);
         } else {
@@ -1218,7 +1219,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return dag.getInternalState();
   }
 
-  // triggered by commit_complete
+  // triggered by commit_complete, checkCommitsForCompletion should only been called in COMMITTING/TERMINATING
   static DAGState checkCommitsForCompletion(DAGImpl dag) {
     LOG.info("Checking commits for DAG completion"
         + ", numCompletedVertices=" + dag.numCompletedVertices
@@ -1240,8 +1241,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         return dag.finished(DAGState.SUCCEEDED);
       }
     } else {
-      if (!dag.commitFutures.isEmpty()) {
-        // pending commits are running
+      Preconditions.checkState(dag.getState() == DAGState.TERMINATING
+          || dag.getState() == DAGState.COMMITTING,
+          "DAG should be in COMMITTING/TERMINATING state, but in " + dag.getState());
+      if (!dag.commitFutures.isEmpty() || dag.numCompletedVertices != dag.numVertices) {
+        // pending commits are running or still some vertices are not completed
         return DAGState.TERMINATING;
       } else {
         return finishWithTerminationCause(dag);
@@ -2155,8 +2159,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     @Override
     public void transition(DAGImpl dag, DAGEvent event) {
       LOG.info("Vertex rerun while dag it is COMMITTING");
+      DAGEventVertexReRunning rerunEvent = (DAGEventVertexReRunning)event;
+      Vertex vertex = dag.getVertex(rerunEvent.getVertexId());
+      dag.reRunningVertices.add(vertex.getVertexId());
+      dag.numSuccessfulVertices--;
+      dag.numCompletedVertices--;
+      dag.addDiagnostic("Vertex re-running"
+          + ", vertexName=" + vertex.getName()
+          + ", vertexId=" + vertex.getVertexId());
       dag.cancelCommits();
-      dag.trySetTerminationCause(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING);
+      dag.enactKill(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3a9558d..6b208b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3669,6 +3669,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         case INIT_FAILURE:
         case INTERNAL_ERROR:
         case AM_USERCODE_FAILURE:
+        case VERTEX_RERUN_IN_COMMITTING:
         case VERTEX_RERUN_AFTER_COMMIT:
         case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
         default://should not occur
@@ -3685,8 +3686,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
-
-
       VertexEventTermination vet = (VertexEventTermination) event;
       VertexTerminationCause trigger = vet.getTerminationCause();
       String msg = "Vertex received Kill while in COMMITTING state, terminationCause="

http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 8fc29c2..3d3bca4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -1703,15 +1703,25 @@ public class TestCommit {
     v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
         TaskState.SUCCEEDED));
     waitUntil(dag, DAGState.COMMITTING);
-    dag.handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    TezTaskID newTaskId = TezTaskID.getInstance(v1.getVertexId(), 1);
+    v1.handle(new VertexEventTaskReschedule(newTaskId));
+    // dag is in TERMINATING, wait for the complete of its rescheduled tasks
+    waitUntil(dag, DAGState.TERMINATING);
+    waitUntil(v1, VertexState.TERMINATING);
+    // reschedueled task is killed
+    v1.handle(new VertexEventTaskCompleted(newTaskId, TaskState.KILLED));
     waitUntil(dag, DAGState.FAILED);
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(DAGState.FAILED, dag.getState());
+    Assert.assertEquals(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, v1.getTerminationCause());
 
     Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
     historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
     historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
     historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
-    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    // VertexFinishedEvent is logged twice due to vertex-rerun
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 2);
     historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
     historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
     historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);