You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/05 01:53:30 UTC
git commit: TEZ-591. Provide mode specific diagnostic information to
the Tez client. (hitesh)
Updated Branches:
refs/heads/master a153a7762 -> 65e09f847
TEZ-591. Provide mode specific diagnostic information to the Tez client. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/65e09f84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/65e09f84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/65e09f84
Branch: refs/heads/master
Commit: 65e09f847ceced8e527fe4c1f4926854f2d3c498
Parents: a153a77
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Nov 4 16:53:06 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Nov 4 16:53:06 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/api/client/DAGStatus.java | 8 ++++-
.../java/org/apache/tez/dag/app/dag/Task.java | 3 ++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 18 +++++++---
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 20 +++++++++++
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 22 +++++++-----
.../tez/dag/app/dag/impl/TestVertexImpl.java | 37 +++++++++++++-------
6 files changed, 82 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index d61173d..8b7277f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -22,12 +22,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.util.StringUtils;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder;
import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
import org.apache.tez.dag.api.TezUncheckedException;
public class DAGStatus {
+ private static final String LINE_SEPARATOR = System
+ .getProperty("line.separator");
+
public enum State {
SUBMITTED,
INITING,
@@ -123,7 +127,9 @@ public class DAGStatus {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status=" + getState()
- + ", progress=" + getDAGProgress());
+ + ", progress=" + getDAGProgress()
+ + ", diagnostics="
+ + StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 293e4c5..f7861cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -78,4 +78,7 @@ public interface Task {
int fromEventId, int maxEvents);
public List<TezEvent> getAndClearTaskTezEvents();
+
+ public List<String> getDiagnostics();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 43479c4..0df7875 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -646,7 +646,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
new DAGHistoryEvent(finishEvt));
}
- static DAGState checkJobForCompletion(DAGImpl dag) {
+ static DAGState checkDAGForCompletion(DAGImpl dag) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking dag completion"
+ ", numCompletedVertices=" + dag.numCompletedVertices
@@ -1128,7 +1128,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ ", numVertices=" + job.numVertices);
// if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT.
- DAGState state = checkJobForCompletion(job);
+ DAGState state = checkDAGForCompletion(job);
if(state == DAGState.RUNNING && forceTransitionToKillWait){
return DAGState.TERMINATING;
}
@@ -1167,21 +1167,29 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private void vertexReRunning(Vertex vertex) {
reRunningVertices.add(vertex.getVertexId());
numSuccessfulVertices--;
- addDiagnostic("Vertex re-running " + vertex.getVertexId());
+ addDiagnostic("Vertex re-running"
+ + ", vertexName=" + vertex.getName()
+ + ", vertexId=" + vertex.getVertexId());
// TODO: Metrics
//job.metrics.completedTask(task);
}
private void vertexFailed(Vertex vertex) {
numFailedVertices++;
- addDiagnostic("Vertex failed " + vertex.getVertexId());
+ addDiagnostic("Vertex failed"
+ + ", vertexName=" + vertex.getName()
+ + ", vertexId=" + vertex.getVertexId()
+ + ", diagnostics=" + vertex.getDiagnostics());
// TODO: Metrics
//job.metrics.failedTask(task);
}
private void vertexKilled(Vertex vertex) {
numKilledVertices++;
- addDiagnostic("Vertex killed " + vertex.getVertexId());
+ addDiagnostic("Vertex killed"
+ + ", vertexName=" + vertex.getName()
+ + ", vertexId=" + vertex.getVertexId()
+ + ", diagnostics=" + vertex.getDiagnostics());
// TODO: Metrics
//job.metrics.killedTask(task);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4e7dd66..3d4703b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -87,6 +88,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+ private static final String LINE_SEPARATOR = System
+ .getProperty("line.separator");
+
protected final Configuration conf;
protected final TaskAttemptListener taskAttemptListener;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
@@ -490,6 +494,22 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
+ @Override
+ public List<String> getDiagnostics() {
+ List<String> diagnostics = new ArrayList<String>(5);
+ readLock.lock();
+ try {
+ for (TaskAttempt att : attempts.values()) {
+ String prefix = "AttemptID:" + att.getID() + " Info:";
+ diagnostics.add(prefix
+ + StringUtils.join(LINE_SEPARATOR, att.getDiagnostics()));
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return diagnostics;
+ }
+
@VisibleForTesting
public TaskStateInternal getInternalState() {
readLock.lock();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6c88467..9c121e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1169,7 +1169,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.clock,
this.taskHeartbeatHandler,
this.appContext,
- this.targetVertices.isEmpty(),
+ (this.targetVertices != null ?
+ this.targetVertices.isEmpty() : true),
locHint, this.taskResource,
this.containerContext);
this.addTask(task);
@@ -1249,8 +1250,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// For VertexManagers setting parallelism, the setParallelism call needs
// to be inline.
vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
- if (!(vertex.numTasks == -1 || vertex.numTasks > 0)) {
- vertex.addDiagnostic("No tasks for vertex " + vertex.getVertexId());
+ if (!(vertex.numTasks == -1 || vertex.numTasks >= 0)) {
+ vertex.addDiagnostic("Invalid task count for vertex"
+ + ", numTasks=" + vertex.numTasks);
vertex.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
vertex.abortVertex(VertexStatus.State.FAILED);
return vertex.finished(VertexState.FAILED);
@@ -1407,9 +1409,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// starting to downstream vertices. If the connections/structure of this
// vertex is not fully defined yet then we could send this event later
// when we are ready
- for (Vertex targetVertex : targetVertices.keySet()) {
- eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
- .getVertexId(), distanceFromRoot));
+ if (targetVertices != null) {
+ for (Vertex targetVertex : targetVertices.keySet()) {
+ eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
+ .getVertexId(), distanceFromRoot));
+ }
}
// If we have no tasks, just transition to vertex completed
@@ -1617,7 +1621,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private void taskFailed(VertexImpl vertex, Task task) {
vertex.failedTaskCount++;
- vertex.addDiagnostic("Task failed " + task.getTaskId());
+ vertex.addDiagnostic("Task failed"
+ + ", taskId=" + task.getTaskId()
+ + ", diagnostics=" + task.getDiagnostics());
// TODO Metrics
//vertex.metrics.failedTask(task);
}
@@ -1840,7 +1846,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public void setOutputVertices(Map<Vertex, Edge> outVertices) {
this.targetVertices = outVertices;
}
-
+
@Override
public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
Preconditions.checkArgument(inputs.size() < 2,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 1ab9c1d..dadc0e8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -97,6 +97,7 @@ public class TestVertexImpl {
private TezDAGID dagId;
private ApplicationAttemptId appAttemptId;
private DAGPlan dagPlan;
+ private DAGPlan invalidDagPlan;
private Map<String, VertexImpl> vertices;
private Map<TezVertexID, VertexImpl> vertexIdMap;
private DrainDispatcher dispatcher;
@@ -224,7 +225,6 @@ public class TestVertexImpl {
.setTaskModule("x1.y1")
.build()
)
- .addOutEdgeId("e1")
.build()
)
.build();
@@ -527,6 +527,7 @@ public class TestVertexImpl {
ApplicationId.newInstance(100, 1), 1);
dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
dagPlan = createTestDAGPlan();
+ invalidDagPlan = createInvalidDAGPlan();
dispatcher = new DrainDispatcher();
fsTokens = new Credentials();
appContext = mock(AppContext.class);
@@ -767,7 +768,8 @@ public class TestVertexImpl {
Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
String diagnostics =
StringUtils.join(",", v.getDiagnostics()).toLowerCase();
- Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
+ Assert.assertTrue(diagnostics.contains("task failed"
+ + ", taskid=" + t1.toString()));
}
@Test(timeout = 5000)
@@ -1241,16 +1243,27 @@ public class TestVertexImpl {
@Test
public void testVertexWithNoTasks() {
- TezDAGID invalidDagId = new TezDAGID(
- dagId.getApplicationId(), 1000);
- DAGPlan dPlan = createInvalidDAGPlan();
- TezVertexID vId = new TezVertexID(invalidDagId, 1);
- VertexPlan vPlan = dPlan.getVertex(0);
- VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
- dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
- clock, thh, appContext, vertexLocationHint);
- v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
- Assert.assertEquals(VertexState.FAILED, v.getState());
+ TezVertexID vId = null;
+ try {
+ TezDAGID invalidDagId = new TezDAGID(
+ dagId.getApplicationId(), 1000);
+ vId = new TezVertexID(invalidDagId, 1);
+ VertexPlan vPlan = invalidDagPlan.getVertex(0);
+ VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
+ dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
+ clock, thh, appContext, vertexLocationHint);
+ vertexIdMap.put(vId, v);
+ v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.INITED, v.getState());
+ v.handle(new VertexEvent(vId, VertexEventType.V_START));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+ } finally {
+ if (vId != null) {
+ vertexIdMap.remove(vId);
+ }
+ }
}
}