You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/02/02 03:46:09 UTC
tez git commit: TEZ-1895. Vertex reRunning should decrease
successfulMembers of VertexGroupInfo (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master ad6bf07eb -> cfa637a16
TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cfa637a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cfa637a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cfa637a1
Branch: refs/heads/master
Commit: cfa637a16fa01b197c0310e03ef4a6e19883aaf1
Parents: ad6bf07
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Feb 2 10:45:32 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Feb 2 10:45:32 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/DAGTerminationCause.java | 3 ++
.../tez/dag/app/dag/VertexTerminationCause.java | 3 ++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 25 ++++++---
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 +++++++++
.../tez/dag/app/dag/impl/TestDAGImpl.java | 53 +++++++++++++++++++-
6 files changed, 101 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03b0624..24c5b32 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1895. Vertex reRunning should decrease successfulMembers of VertexGroupInfo.
TEZ-1999. IndexOutOfBoundsException during merge.
TEZ-2000. Source vertex exists error during DAG submission.
TEZ-2008. Add methods to SecureShuffleUtils to verify a reply based on a provided Key.
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index d01fb2f..5ae96a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -39,6 +39,9 @@ public enum DAGTerminationCause {
/** DAG failed during output commit. */
COMMIT_FAILURE,
+ /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+ VERTEX_RERUN_AFTER_COMMIT,
+
/** DAG failed while trying to write recovery events */
RECOVERY_FAILURE,
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 4bfe001..2eeae3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -43,6 +43,9 @@ public enum VertexTerminationCause {
/** This vertex failed during commit. */
COMMIT_FAILURE,
+ /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */
+ VERTEX_RERUN_AFTER_COMMIT,
+
/** This vertex failed as it had invalid number tasks. */
INVALID_NUM_OF_TASKS,
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f4e5bad..aa7723b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1083,6 +1083,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.FAILED);
}
+ if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){
+ String diagnosticMsg = "DAG failed due to vertex rerun after commit." +
+ " failedVertices:" + dag.numFailedVertices +
+ " killedVertices:" + dag.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ dag.addDiagnostic(diagnosticMsg);
+ return dag.finished(DAGState.FAILED);
+ }
if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
String diagnosticMsg = "DAG failed due to failure in recovery handling." +
" failedVertices:" + dag.numFailedVertices +
@@ -1738,9 +1746,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
public DAGState transition(DAGImpl job, DAGEvent event) {
DAGEventVertexReRunning vertexEvent = (DAGEventVertexReRunning) event;
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
- job.numCompletedVertices--;
boolean failed = job.vertexReRunning(vertex);
-
+ if (!failed) {
+ job.numCompletedVertices--;
+ }
LOG.info("Vertex " + vertex.getLogIdentifier() + " re-running."
+ ", numCompletedVertices=" + job.numCompletedVertices
@@ -1848,11 +1857,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (groupList != null) {
for (VertexGroupInfo groupInfo : groupList) {
if (groupInfo.committed) {
- LOG.info("Aborting job as committed vertex: "
- + vertex.getLogIdentifier() + " is re-running");
- enactKill(DAGTerminationCause.COMMIT_FAILURE,
- VertexTerminationCause.COMMIT_FAILURE);
+ String msg = "Aborting job as committed vertex: "
+ + vertex.getLogIdentifier() + " is re-running";
+ LOG.info(msg);
+ addDiagnostic(msg);
+ enactKill(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT,
+ VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT);
return true;
+ } else {
+ groupInfo.successfulMembers--;
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 577c98b..c3f4ae7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1826,6 +1826,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.abortVertex(State.FAILED);
return vertex.finished(VertexState.FAILED);
}
+ else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) {
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount
+ + " killedTasks:"
+ + vertex.killedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.abortVertex(State.FAILED);
+ return vertex.finished(VertexState.FAILED);
+ }
+ else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT) {
+ vertex.setFinishTime();
+ String diagnosticMsg = "Vertex failed/killed due to invalid rerun failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount
+ + " killedTasks:"
+ + vertex.killedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.abortVertex(State.FAILED);
+ return vertex.finished(VertexState.FAILED);
+ }
else {
//should never occur
throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex:" + vertex.logIdentifier
@@ -3472,6 +3494,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case INIT_FAILURE:
case INTERNAL_ERROR:
case AM_USERCODE_FAILURE:
+ case VERTEX_RERUN_AFTER_COMMIT:
case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
default://should not occur
throw new TezUncheckedException("VertexKilledTransition: event.terminationCause is unexpected: " + trigger);
http://git-wip-us.apache.org/repos/asf/tez/blob/cfa637a1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 7c4d715..cae9059 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -91,6 +91,7 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
+import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -774,6 +775,9 @@ public class TestDAGImpl {
doReturn(appAttemptId.getApplicationId())
.when(groupAppContext).getApplicationID();
doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
+
+ // reset totalCommitCounter to 0
+ TotalCountingOutputCommitter.totalCommitCounter = 0;
taskEventDispatcher = new TaskEventDispatcher();
dispatcher.register(TaskEventType.class, taskEventDispatcher);
taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
@@ -1080,7 +1084,54 @@ public class TestDAGImpl {
Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
}
-
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGroupDAGWithVertexReRunning() {
+ conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+ initDAG(groupDag);
+ startDAG(groupDag);
+ dispatcher.await();
+
+ Vertex v1 = groupDag.getVertex("vertex1");
+ Vertex v2 = groupDag.getVertex("vertex2");
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.await();
+ // commit should not happen due to vertex-rerunning
+ Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
+
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.await();
+ // commit happen
+ Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+ Assert.assertEquals(2, groupDag.getSuccessfulVertices());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGroupDAGWithVertexReRunningAfterCommit() {
+ conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+ initDAG(groupDag);
+ startDAG(groupDag);
+ dispatcher.await();
+
+ Vertex v1 = groupDag.getVertex("vertex1");
+ Vertex v2 = groupDag.getVertex("vertex2");
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
+ dispatcher.await();
+ // vertex group commit happens
+ Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
+
+ // dag failed when vertex re-run happens after vertex group commit is done.
+ dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
+ dispatcher.await();
+ Assert.assertEquals(DAGState.FAILED, groupDag.getState());
+ Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, groupDag.getTerminationCause());
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGCompletionWithCommitSuccess() {