You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:54 UTC
[16/43] tez git commit: TEZ-2412. Should kill vertex in
DAGImpl#VertexRerunWhileCommitting (zjffdu)
TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6e6ad706
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6e6ad706
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6e6ad706
Branch: refs/heads/TEZ-2003
Commit: 6e6ad706f5b6611058541c3bf072343bf002ced5
Parents: 4a6808c
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri May 8 19:55:53 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri May 8 19:55:53 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 20 ++++++++++++++++----
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 +--
.../apache/tez/dag/app/dag/impl/TestCommit.java | 14 ++++++++++++--
4 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3520768..185e1b0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
Default max limit increased. Should not affect existing users.
ALL CHANGES:
+ TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting
TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly
TEZ-776. Reduce AM mem usage caused by storing TezEvents
TEZ-2423. Tez UI: Remove Attempt Index column from task->attempts page
http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 1726c18..0a87241 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1206,6 +1206,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return dag.finished(DAGState.SUCCEEDED);
}
} else {
+ // check commits before move to COMPLETED state.
if (dag.commitFutures.isEmpty()) {
return finishWithTerminationCause(dag);
} else {
@@ -1218,7 +1219,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return dag.getInternalState();
}
- // triggered by commit_complete
+ // triggered by commit_complete, checkCommitsForCompletion should only been called in COMMITTING/TERMINATING
static DAGState checkCommitsForCompletion(DAGImpl dag) {
LOG.info("Checking commits for DAG completion"
+ ", numCompletedVertices=" + dag.numCompletedVertices
@@ -1240,8 +1241,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return dag.finished(DAGState.SUCCEEDED);
}
} else {
- if (!dag.commitFutures.isEmpty()) {
- // pending commits are running
+ Preconditions.checkState(dag.getState() == DAGState.TERMINATING
+ || dag.getState() == DAGState.COMMITTING,
+ "DAG should be in COMMITTING/TERMINATING state, but in " + dag.getState());
+ if (!dag.commitFutures.isEmpty() || dag.numCompletedVertices != dag.numVertices) {
+ // pending commits are running or still some vertices are not completed
return DAGState.TERMINATING;
} else {
return finishWithTerminationCause(dag);
@@ -2155,8 +2159,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@Override
public void transition(DAGImpl dag, DAGEvent event) {
LOG.info("Vertex rerun while dag it is COMMITTING");
+ DAGEventVertexReRunning rerunEvent = (DAGEventVertexReRunning)event;
+ Vertex vertex = dag.getVertex(rerunEvent.getVertexId());
+ dag.reRunningVertices.add(vertex.getVertexId());
+ dag.numSuccessfulVertices--;
+ dag.numCompletedVertices--;
+ dag.addDiagnostic("Vertex re-running"
+ + ", vertexName=" + vertex.getName()
+ + ", vertexId=" + vertex.getVertexId());
dag.cancelCommits();
- dag.trySetTerminationCause(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING);
+ dag.enactKill(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3a9558d..6b208b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3669,6 +3669,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
case INIT_FAILURE:
case INTERNAL_ERROR:
case AM_USERCODE_FAILURE:
+ case VERTEX_RERUN_IN_COMMITTING:
case VERTEX_RERUN_AFTER_COMMIT:
case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE); break;
default://should not occur
@@ -3685,8 +3686,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
-
-
VertexEventTermination vet = (VertexEventTermination) event;
VertexTerminationCause trigger = vet.getTerminationCause();
String msg = "Vertex received Kill while in COMMITTING state, terminationCause="
http://git-wip-us.apache.org/repos/asf/tez/blob/6e6ad706/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 8fc29c2..3d3bca4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -1703,15 +1703,25 @@ public class TestCommit {
v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
TaskState.SUCCEEDED));
waitUntil(dag, DAGState.COMMITTING);
- dag.handle(new DAGEventVertexReRunning(v1.getVertexId()));
+ TezTaskID newTaskId = TezTaskID.getInstance(v1.getVertexId(), 1);
+ v1.handle(new VertexEventTaskReschedule(newTaskId));
+ // dag is in TERMINATING, wait for the complete of its rescheduled tasks
+ waitUntil(dag, DAGState.TERMINATING);
+ waitUntil(v1, VertexState.TERMINATING);
+ // reschedueled task is killed
+ v1.handle(new VertexEventTaskCompleted(newTaskId, TaskState.KILLED));
waitUntil(dag, DAGState.FAILED);
+ Assert.assertEquals(VertexState.FAILED, v1.getState());
+ Assert.assertEquals(DAGState.FAILED, dag.getState());
+ Assert.assertEquals(VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING, v1.getTerminationCause());
Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
- historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ // VertexFinishedEvent is logged twice due to vertex-rerun
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 2);
historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);