You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/02/02 03:46:09 UTC

tez git commit: TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master ad6bf07eb -> cfa637a16


TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cfa637a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cfa637a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cfa637a1

Branch: refs/heads/master
Commit: cfa637a16fa01b197c0310e03ef4a6e19883aaf1
Parents: ad6bf07
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Feb 2 10:45:32 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Feb 2 10:45:32 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/DAGTerminationCause.java    |  3 ++
 .../tez/dag/app/dag/VertexTerminationCause.java |  3 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 25 ++++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 +++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 53 +++++++++++++++++++-
 6 files changed, 101 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03b0624..24c5b32 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
   TEZ-1999. IndexOutOfBoundsException during merge.
   TEZ-2000. Source vertex exists error during DAG submission.
   TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index d01fb2f..5ae96a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -39,6 +39,9 @@ public enum DAGTerminationCause {
   /** DAG failed during output commit. */
   COMMIT_FAILURE,
 
+  /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+  VERTEX_RERUN_AFTER_COMMIT,
+
   /** DAG failed while trying to write recovery events */
   RECOVERY_FAILURE,
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 4bfe001..2eeae3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -43,6 +43,9 @@ public enum VertexTerminationCause {
   /** This vertex failed during commit. */
   COMMIT_FAILURE,
 
+  /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+  VERTEX_RERUN_AFTER_COMMIT,
+
   /** This vertex failed as it had invalid number tasks. */
   INVALID_NUM_OF_TASKS, 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f4e5bad..aa7723b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1083,6 +1083,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.addDiagnostic(diagnosticMsg);
         return dag.finished(DAGState.FAILED);
       }
+      if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
+        String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
+            " failedVertices:" + dag.numFailedVertices +
+            " killedVertices:" + dag.numKilledVertices;
+        LOG.info(diagnosticMsg);
+        dag.addDiagnostic(diagnosticMsg);
+        return dag.finished(DAGState.FAILED);
+      }
       if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
         String diagnosticMsg = "DAG failed due to failure in recovery handling." +
             " failedVertices:" + dag.numFailedVertices +
@@ -1738,9 +1746,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     public DAGState transition(DAGImpl job, DAGEvent event) {
       DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
-      job.numCompletedVertices--;
       boolean failed = job.vertexReRunning(vertex);
-
+      if (!failed) {
+        job.numCompletedVertices--;
+      }
 
       LOG.info("Vertex " + vertex.getLogIdentifier() + " re-running."
           + ", numCompletedVertices=" + job.numCompletedVertices
@@ -1848,11 +1857,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       if (groupList != null) {
         for (VertexGroupInfo groupInfo : groupList) {
           if (groupInfo.committed) {
-            LOG.info("Aborting job as committed vertex: "
-                + vertex.getLogIdentifier() + " is re-running");
-            enactKill(DAGTerminationCause.COMMIT_FAILURE,
-                VertexTerminationCause.COMMIT_FAILURE);
+            String msg = "Aborting job as committed vertex: "
+                + vertex.getLogIdentifier() + " is re-running";
+            LOG.info(msg);
+            addDiagnostic(msg);
+            enactKill(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT,
+                VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT);
             return true;
+          } else {
+            groupInfo.successfulMembers--;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 577c98b..c3f4ae7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1826,6 +1826,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.abortVertex(State.FAILED);
         return vertex.finished(VertexState.FAILED);
       }
+      else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
+      else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT) {
+        vertex.setFinishTime();
+        String diagnosticMsg = "Vertex failed/killed due to invalid rerun failed. "
+            + "failedTasks:"
+            + vertex.failedTaskCount
+            + " killedTasks:"
+            + vertex.killedTaskCount;
+        LOG.info(diagnosticMsg);
+        vertex.abortVertex(State.FAILED);
+        return vertex.finished(VertexState.FAILED);
+      }
       else {
         //should never occur
         throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier
@@ -3472,6 +3494,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         case INIT_FAILURE:
         case INTERNAL_ERROR:
         case AM_USERCODE_FAILURE:
+        case VERTEX_RERUN_AFTER_COMMIT:
         case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
         default://should not occur
           throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger);

http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 7c4d715..cae9059 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -91,6 +91,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -774,6 +775,9 @@ public class TestDAGImpl {
     doReturn(appAttemptId.getApplicationId())
         .when(groupAppContext).getApplicationID();
     doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
+
+    // reset totalCommitCounter to 0
+    TotalCountingOutputCommitter.totalCommitCounter = 0;
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
     taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
@@ -1080,7 +1084,54 @@ public class TestDAGImpl {
     Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
     Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
   }  
-  
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGroupDAGWithVertexReRunning() {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    initDAG(groupDag);
+    startDAG(groupDag);
+    dispatcher.await();
+
+    Vertex v1 = groupDag.getVertex("vertex1");
+    Vertex v2 = groupDag.getVertex("vertex2");
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // commit should not happen due to vertex-rerunning
+    Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
+
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // commit happen
+    Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+    Assert.assertEquals(2, groupDag.getSuccessfulVertices());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGroupDAGWithVertexReRunningAfterCommit() {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    initDAG(groupDag);
+    startDAG(groupDag);
+    dispatcher.await();
+
+    Vertex v1 = groupDag.getVertex("vertex1");
+    Vertex v2 = groupDag.getVertex("vertex2");
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+    dispatcher.await();
+    // vertex group commit happens
+    Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+
+    // dag failed when vertex re-run happens after vertex group commit is done.
+    dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.FAILED, groupDag.getState());
+    Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, groupDag.getTerminationCause());
+  }
+
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGCompletionWithCommitSuccess() {