You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/04/30 03:43:22 UTC
svn commit: r1477447 -
/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
Author: bikas
Date: Tue Apr 30 01:43:21 2013
New Revision: 1477447
URL: http://svn.apache.org/r1477447
Log:
TEZ.39 Fix VertexImpl.checkVertexCompleteSuccess (bikas)
Modified:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1477447&r1=1477446&r2=1477447&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Tue Apr 30 01:43:21 2013
@@ -87,7 +87,6 @@ import org.apache.tez.dag.app.dag.event.
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.JobTokenSecretManager;
import org.apache.tez.engine.records.TezDAGID;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.engine.records.TezTaskAttemptID;
@@ -231,10 +230,10 @@ public class VertexImpl implements org.a
(VertexState.KILL_WAIT,
EnumSet.of(VertexState.KILL_WAIT, VertexState.KILLED),
VertexEventType.V_TASK_COMPLETED,
- new KillWaitTaskCompletedTransition())
+ new TaskCompletedTransition())
.addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
- TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+ TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) // TODO shouldnt be done for KILL_WAIT vertex
.addTransition(VertexState.KILL_WAIT, VertexState.KILL_WAIT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
@@ -673,39 +672,49 @@ public class VertexImpl implements org.a
return FileSystem.get(conf);
}
- static VertexState checkVertexCompleteSuccess(VertexImpl vertex) {
- // TODO TEZ-39 this vertex is definitely buggy as completed includes
- // killed/failed check for vertex success
- if (vertex.completedTaskCount == vertex.tasks.size()) {
- if (vertex.failedTaskCount > 0) {
- try {
- vertex.committer.abortVertex(VertexStatus.State.FAILED);
- } catch (IOException e) {
- LOG.error("Failed to do abort on vertex, name=" + vertex.getName(),
- e);
+ static VertexState checkVertexForCompletion(VertexImpl vertex) {
+ //check for vertex failure first
+ if (vertex.failedTaskCount > 1) {
+ vertex.setFinishTime();
+
+ String diagnosticMsg = "Vertex failed as tasks failed. "
+ + "failedTasks:"
+ + vertex.failedTaskCount;
+ LOG.info(diagnosticMsg);
+ vertex.addDiagnostic(diagnosticMsg);
+ vertex.abortVertex(VertexStatus.State.FAILED);
+ vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+ .getVertexId(), VertexState.FAILED));
+ return vertex.finished(VertexState.FAILED);
+ }
+
+ if(vertex.succeededTaskCount == vertex.tasks.size()) {
+ try {
+ if (!vertex.committed.getAndSet(true)) {
+ // commit only once
+ vertex.committer.commitVertex();
}
+ } catch (IOException e) {
+ LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
.getVertexId(), VertexState.FAILED));
return vertex.finished(VertexState.FAILED);
- } else {
- try {
- if (!vertex.committed.getAndSet(true)) {
- // commit only once
- vertex.committer.commitVertex();
- }
- } catch (IOException e) {
- LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
- vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
- .getVertexId(), VertexState.FAILED));
- return vertex.finished(VertexState.FAILED);
- }
- vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
- .getVertexId(), vertex.getState()));
- return vertex.finished(VertexState.SUCCEEDED);
}
+ vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
+ .getVertexId(), vertex.getState()));
+ return vertex.finished(VertexState.SUCCEEDED);
}
- // TODO: what if one of the tasks failed?
- return null;
+
+ if (vertex.completedTaskCount == vertex.tasks.size()) {
+ // this means the vertex has some killed tasks
+ assert vertex.killedTaskCount > 0;
+ vertex.setFinishTime();
+ vertex.abortVertex(VertexStatus.State.KILLED);
+ return vertex.finished(VertexState.KILLED);
+ }
+
+ //return the current state, Vertex not finished yet
+ return vertex.getInternalState();
}
VertexState finished(VertexState finalState) {
@@ -1033,6 +1042,7 @@ public class VertexImpl implements org.a
}
}
+ // TODO Why is TA event coming directly to Vertex instead of TA -> Task -> Vertex
private static class TaskAttemptCompletedEventTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
@Override
@@ -1107,39 +1117,13 @@ public class VertexImpl implements org.a
}
vertex.vertexScheduler.onVertexCompleted();
- VertexState state = checkVertexForCompletion(vertex);
+ VertexState state = VertexImpl.checkVertexForCompletion(vertex);
if(state == VertexState.SUCCEEDED) {
vertex.vertexScheduler.onVertexCompleted();
}
return state;
}
- protected VertexState checkVertexForCompletion(VertexImpl vertex) {
- //check for vertex failure
- if (vertex.failedTaskCount > 1) {
- vertex.setFinishTime();
-
- String diagnosticMsg = "Vertex failed as tasks failed. "
- + "failedTasks:"
- + vertex.failedTaskCount;
- LOG.info(diagnosticMsg);
- vertex.addDiagnostic(diagnosticMsg);
- vertex.abortVertex(VertexStatus.State.FAILED);
- vertex.eventHandler.handle(new DAGEventVertexCompleted(vertex
- .getVertexId(), VertexState.FAILED));
- return vertex.finished(VertexState.FAILED);
- }
-
- VertexState vertexCompleteSuccess =
- VertexImpl.checkVertexCompleteSuccess(vertex);
- if (vertexCompleteSuccess != null) {
- return vertexCompleteSuccess;
- }
-
- //return the current state, Vertex not finished yet
- return vertex.getInternalState();
- }
-
private void taskSucceeded(VertexImpl vertex, Task task) {
vertex.succeededTaskCount++;
// TODO Metrics
@@ -1168,7 +1152,7 @@ public class VertexImpl implements org.a
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
VertexState vertexCompleteSuccess =
- VertexImpl.checkVertexCompleteSuccess(vertex);
+ VertexImpl.checkVertexForCompletion(vertex);
if (vertexCompleteSuccess != null) {
return vertexCompleteSuccess;
}
@@ -1188,20 +1172,6 @@ public class VertexImpl implements org.a
}
}
- private static class KillWaitTaskCompletedTransition extends
- TaskCompletedTransition {
- @Override
- protected VertexState checkVertexForCompletion(VertexImpl vertex) {
- if (vertex.completedTaskCount == vertex.tasks.size()) {
- vertex.setFinishTime();
- vertex.abortVertex(VertexStatus.State.KILLED);
- return vertex.finished(VertexState.KILLED);
- }
- //return the current state, Job not finished yet
- return vertex.getInternalState();
- }
- }
-
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}