You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/05/09 03:30:15 UTC
git commit: TEZ-108. DAG should track numCompleted and numSuccessful
vertices separately.
Updated Branches:
refs/heads/TEZ-1 df1038e1a -> 6f27ac079
TEZ-108. DAG should track numCompleted and numSuccessful vertices separately.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6f27ac07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6f27ac07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6f27ac07
Branch: refs/heads/TEZ-1
Commit: 6f27ac079065334c8a72ff63fb5a81bb41d794ce
Parents: df1038e
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed May 8 16:33:32 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed May 8 18:27:31 2013 -0700
----------------------------------------------------------------------
.../main/java/org/apache/tez/dag/app/dag/DAG.java | 2 +-
.../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 114 ++++++---------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +-
3 files changed, 49 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6f27ac07/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index a3fb3b7..03bf570 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -52,7 +52,7 @@ public interface DAG {
Vertex getVertex(TezVertexID vertexId);
List<String> getDiagnostics();
int getTotalVertices();
- int getCompletedVertices();
+ int getSuccessfulVertices();
float getProgress();
boolean isUber();
String getUserName();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6f27ac07/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c37a9e9..0e1206f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -85,7 +85,6 @@ import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
import org.apache.tez.engine.common.security.TokenCache;
import org.apache.tez.engine.records.TezDAGID;
-import org.apache.tez.engine.records.TezTaskAttemptID;
import org.apache.tez.engine.records.TezVertexID;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -117,12 +116,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private DAGScheduler dagScheduler;
- /**
- * maps nodes to tasks that have run on those nodes
- */
- private final HashMap<NodeId, List<TezTaskAttemptID>>
- nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TezTaskAttemptID>>();
-
private final EventHandler eventHandler;
// TODO Metrics
//private final MRAppMetrics metrics;
@@ -232,7 +225,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
(DAGState.KILL_WAIT,
EnumSet.of(DAGState.KILL_WAIT, DAGState.KILLED),
DAGEventType.DAG_VERTEX_COMPLETED,
- new KillWaitTaskCompletedTransition())
+ new VertexCompletedTransition())
.addTransition(DAGState.KILL_WAIT, DAGState.KILL_WAIT,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
@@ -307,6 +300,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
//changing fields while the job is running
private int numVertices;
private int numCompletedVertices = 0;
+ private int numSuccessfulVertices = 0;
private int numFailedVertices = 0;
private int numKilledVertices = 0;
private boolean isUber = false;
@@ -619,22 +613,47 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return FileSystem.get(conf);
}
- static DAGState checkJobCompleteSuccess(DAGImpl dag) {
- // check for dag success
+ static DAGState checkJobForCompletion(DAGImpl dag) {
+
LOG.info("ZZZZ: Checking dag completion"
+ ", numCompletedVertices=" + dag.numCompletedVertices
+ + ", numSuccessfulVertices=" + dag.numSuccessfulVertices
+ ", numFailedVertices=" + dag.numFailedVertices
+ ", numKilledVertices=" + dag.numKilledVertices
+ ", numVertices=" + dag.numVertices);
- if (dag.numCompletedVertices == dag.vertices.size()) {
- // TODO: Maybe set cleanup progress. Otherwise dag progress will
- // always stay at 0.95 when reported from an AM.
- // TODO DAG committer
+ if (dag.numFailedVertices > 0) {
+ dag.setFinishTime();
+ String diagnosticMsg = "DAG failed as vertices failed. " +
+ " failedVertices:" + dag.numFailedVertices +
+ " killedVertices:" + dag.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ dag.addDiagnostic(diagnosticMsg);
+ dag.abortJob(DAGStatus.State.FAILED);
+ return dag.finished(DAGState.FAILED);
+ }
+
+ if(dag.numSuccessfulVertices == dag.numVertices) {
+ dag.setFinishTime();
dag.logJobHistoryFinishedEvent();
return dag.finished(DAGState.SUCCEEDED);
}
- return null;
+
+ if (dag.numCompletedVertices == dag.numVertices) {
+ // this means the dag has some killed vertices
+ String diagnosticMsg = "DAG killed. " +
+ " failedVertices:" + dag.numFailedVertices +
+ " killedVertices:" + dag.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ dag.addDiagnostic(diagnosticMsg);
+ assert dag.numKilledVertices > 0;
+ dag.setFinishTime();
+ dag.abortJob(DAGStatus.State.KILLED);
+ return dag.finished(DAGState.KILLED);
+ }
+
+ //return the current state, Job not finished yet
+ return dag.getInternalState();
}
DAGState finished(DAGState finalState) {
@@ -690,10 +709,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
- public int getCompletedVertices() {
+ public int getSuccessfulVertices() {
readLock.lock();
try {
- return numCompletedVertices;
+ return numSuccessfulVertices;
} finally {
readLock.unlock();
}
@@ -1023,6 +1042,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ ", vertexState=" + vertexEvent.getVertexState());
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
+ job.numCompletedVertices++;
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
vertexSucceeded(job, vertex);
} else if (vertexEvent.getVertexState() == VertexState.FAILED) {
@@ -1031,40 +1051,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
vertexKilled(job, vertex);
}
- LOG.info("ZZZZ: Num completed vertices: " + job.numCompletedVertices
- + ", Num failed vertices: " + job.numFailedVertices
- + ", Num killed vertices: " + job.numKilledVertices);
+ LOG.info("ZZZZ: Vertex completed."
+ + ", numCompletedVertices=" + job.numCompletedVertices
+ + ", numSuccessfulVertices=" + job.numSuccessfulVertices
+ + ", numFailedVertices=" + job.numFailedVertices
+ + ", numKilledVertices=" + job.numKilledVertices
+ + ", numVertices=" + job.numVertices);
job.dagScheduler.vertexCompleted(vertex);
return checkJobForCompletion(job);
}
- protected DAGState checkJobForCompletion(DAGImpl job) {
- //check for Job failure
- if (job.numFailedVertices > 0) {
- job.setFinishTime();
-
- String diagnosticMsg = "Job failed as vertices failed. " +
- " failedVertices:" + job.numFailedVertices +
- " killedVertices:" + job.numKilledVertices;
- LOG.info(diagnosticMsg);
- job.addDiagnostic(diagnosticMsg);
- job.abortJob(DAGStatus.State.FAILED);
- return job.finished(DAGState.FAILED);
- }
-
- DAGState jobCompleteSuccess = DAGImpl.checkJobCompleteSuccess(job);
- if (jobCompleteSuccess != null) {
- return jobCompleteSuccess;
- }
-
- //return the current state, Job not finished yet
- return job.getInternalState();
- }
-
private void vertexSucceeded(DAGImpl job, Vertex vertex) {
- job.numCompletedVertices++;
+ job.numSuccessfulVertices++;
// TODO: Metrics
//job.metrics.completedTask(task);
}
@@ -1084,33 +1084,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
- // Transition class for handling jobs with no tasks
+ // Transition class for handling jobs with no vertices
static class JobNoTasksCompletedTransition implements
MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
@Override
- public DAGState transition(DAGImpl job, DAGEvent event) {
- DAGState jobCompleteSuccess = DAGImpl.checkJobCompleteSuccess(job);
- if (jobCompleteSuccess != null) {
- return jobCompleteSuccess;
- }
-
- // Return the current state, Job not finished yet
- return job.getInternalState();
- }
- }
-
- private static class KillWaitTaskCompletedTransition extends
- VertexCompletedTransition {
- @Override
- protected DAGState checkJobForCompletion(DAGImpl job) {
- if (job.numCompletedVertices == job.vertices.size()) {
- job.setFinishTime();
- job.abortJob(DAGStatus.State.KILLED);
- return job.finished(DAGState.KILLED);
- }
- //return the current state, Job not finished yet
- return job.getInternalState();
+ public DAGState transition(DAGImpl dag, DAGEvent event) {
+ return checkJobForCompletion(dag);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6f27ac07/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index dd621fe..f74b172 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1078,7 +1078,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
- vertex.completedTaskCount++;// TEZ-39 this is a bug
+ vertex.completedTaskCount++;
LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
Task task = vertex.tasks.get(taskEvent.getTaskID());