You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/05 01:53:30 UTC

git commit: TEZ-591. Provide mode specific diagnostic information to the Tez client. (hitesh)

Updated Branches:
  refs/heads/master a153a7762 -> 65e09f847


TEZ-591. Provide mode specific diagnostic information to the Tez client. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/65e09f84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/65e09f84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/65e09f84

Branch: refs/heads/master
Commit: 65e09f847ceced8e527fe4c1f4926854f2d3c498
Parents: a153a77
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Nov 4 16:53:06 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Nov 4 16:53:06 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/client/DAGStatus.java    |  8 ++++-
 .../java/org/apache/tez/dag/app/dag/Task.java   |  3 ++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 18 +++++++---
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 20 +++++++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 22 +++++++-----
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 37 +++++++++++++-------
 6 files changed, 82 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index d61173d..8b7277f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -22,12 +22,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 public class DAGStatus {
 
+  private static final String LINE_SEPARATOR = System
+    .getProperty("line.separator");
+
   public enum State {
     SUBMITTED,
     INITING,
@@ -123,7 +127,9 @@ public class DAGStatus {
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("status=" + getState()
-        + ", progress=" + getDAGProgress());
+        + ", progress=" + getDAGProgress()
+        + ", diagnostics="
+        + StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 293e4c5..f7861cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -78,4 +78,7 @@ public interface Task {
       int fromEventId, int maxEvents);
   
   public List<TezEvent> getAndClearTaskTezEvents();
+
+  public List<String> getDiagnostics();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 43479c4..0df7875 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -646,7 +646,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         new DAGHistoryEvent(finishEvt));
   }
 
-  static DAGState checkJobForCompletion(DAGImpl dag) {
+  static DAGState checkDAGForCompletion(DAGImpl dag) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Checking dag completion"
           + ", numCompletedVertices=" + dag.numCompletedVertices
@@ -1128,7 +1128,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           + ", numVertices=" + job.numVertices);
 
       // if the job has not finished but a failure/kill occurred, then force the transition to KILL_WAIT.
-      DAGState state = checkJobForCompletion(job);
+      DAGState state = checkDAGForCompletion(job);
       if(state == DAGState.RUNNING && forceTransitionToKillWait){
         return DAGState.TERMINATING;
       }
@@ -1167,21 +1167,29 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private void vertexReRunning(Vertex vertex) {
     reRunningVertices.add(vertex.getVertexId());
     numSuccessfulVertices--;
-    addDiagnostic("Vertex re-running " + vertex.getVertexId());
+    addDiagnostic("Vertex re-running"
+      + ", vertexName=" + vertex.getName()
+      + ", vertexId=" + vertex.getVertexId());
     // TODO: Metrics
     //job.metrics.completedTask(task);
   }
 
   private void vertexFailed(Vertex vertex) {
     numFailedVertices++;
-    addDiagnostic("Vertex failed " + vertex.getVertexId());
+    addDiagnostic("Vertex failed"
+        + ", vertexName=" + vertex.getName()
+        + ", vertexId=" + vertex.getVertexId()
+        + ", diagnostics=" + vertex.getDiagnostics());
     // TODO: Metrics
     //job.metrics.failedTask(task);
   }
 
   private void vertexKilled(Vertex vertex) {
     numKilledVertices++;
-    addDiagnostic("Vertex killed " + vertex.getVertexId());
+    addDiagnostic("Vertex killed"
+      + ", vertexName=" + vertex.getName()
+      + ", vertexId=" + vertex.getVertexId()
+      + ", diagnostics=" + vertex.getDiagnostics());
     // TODO: Metrics
     //job.metrics.killedTask(task);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4e7dd66..3d4703b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -87,6 +88,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
 
+  private static final String LINE_SEPARATOR = System
+    .getProperty("line.separator");
+
   protected final Configuration conf;
   protected final TaskAttemptListener taskAttemptListener;
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
@@ -490,6 +494,22 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }        
   }
 
+  @Override
+  public List<String> getDiagnostics() {
+    List<String> diagnostics = new ArrayList<String>(5);
+    readLock.lock();
+    try {
+      for (TaskAttempt att : attempts.values()) {
+        String prefix = "AttemptID:" + att.getID() + " Info:";
+        diagnostics.add(prefix
+          + StringUtils.join(LINE_SEPARATOR, att.getDiagnostics()));
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return diagnostics;
+  }
+
   @VisibleForTesting
   public TaskStateInternal getInternalState() {
     readLock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6c88467..9c121e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1169,7 +1169,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               this.clock,
               this.taskHeartbeatHandler,
               this.appContext,
-              this.targetVertices.isEmpty(),
+              (this.targetVertices != null ?
+                this.targetVertices.isEmpty() : true),
               locHint, this.taskResource,
               this.containerContext);
       this.addTask(task);
@@ -1249,8 +1250,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       // For VertexManagers setting parallelism, the setParallelism call needs
       // to be inline.
       vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
-      if (!(vertex.numTasks == -1 || vertex.numTasks > 0)) {
-        vertex.addDiagnostic("No tasks for vertex " + vertex.getVertexId());
+      if (!(vertex.numTasks == -1 || vertex.numTasks >= 0)) {
+        vertex.addDiagnostic("Invalid task count for vertex"
+          + ", numTasks=" + vertex.numTasks);
         vertex.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
         vertex.abortVertex(VertexStatus.State.FAILED);
         return vertex.finished(VertexState.FAILED);
@@ -1407,9 +1409,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // starting to downstream vertices. If the connections/structure of this
     // vertex is not fully defined yet then we could send this event later
     // when we are ready
-    for (Vertex targetVertex : targetVertices.keySet()) {
-      eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
-          .getVertexId(), distanceFromRoot));
+    if (targetVertices != null) {
+      for (Vertex targetVertex : targetVertices.keySet()) {
+        eventHandler.handle(new VertexEventSourceVertexStarted(targetVertex
+            .getVertexId(), distanceFromRoot));
+      }
     }
 
     // If we have no tasks, just transition to vertex completed
@@ -1617,7 +1621,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     private void taskFailed(VertexImpl vertex, Task task) {
       vertex.failedTaskCount++;
-      vertex.addDiagnostic("Task failed " + task.getTaskId());
+      vertex.addDiagnostic("Task failed"
+        + ", taskId=" + task.getTaskId()
+        + ", diagnostics=" + task.getDiagnostics());
       // TODO Metrics
       //vertex.metrics.failedTask(task);
     }
@@ -1840,7 +1846,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public void setOutputVertices(Map<Vertex, Edge> outVertices) {
     this.targetVertices = outVertices;
   }
-  
+
   @Override
   public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) {
     Preconditions.checkArgument(inputs.size() < 2,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/65e09f84/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 1ab9c1d..dadc0e8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -97,6 +97,7 @@ public class TestVertexImpl {
   private TezDAGID dagId;
   private ApplicationAttemptId appAttemptId;
   private DAGPlan dagPlan;
+  private DAGPlan invalidDagPlan;
   private Map<String, VertexImpl> vertices;
   private Map<TezVertexID, VertexImpl> vertexIdMap;
   private DrainDispatcher dispatcher;
@@ -224,7 +225,6 @@ public class TestVertexImpl {
             .setTaskModule("x1.y1")
             .build()
             )
-        .addOutEdgeId("e1")
         .build()
         )
         .build();
@@ -527,6 +527,7 @@ public class TestVertexImpl {
         ApplicationId.newInstance(100, 1), 1);
     dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
     dagPlan = createTestDAGPlan();
+    invalidDagPlan = createInvalidDAGPlan();
     dispatcher = new DrainDispatcher();
     fsTokens = new Credentials();
     appContext = mock(AppContext.class);
@@ -767,7 +768,8 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
     String diagnostics =
         StringUtils.join(",", v.getDiagnostics()).toLowerCase();
-    Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
+    Assert.assertTrue(diagnostics.contains("task failed"
+        + ", taskid=" + t1.toString()));
   }
 
   @Test(timeout = 5000)
@@ -1241,16 +1243,27 @@ public class TestVertexImpl {
 
   @Test
   public void testVertexWithNoTasks() {
-    TezDAGID invalidDagId = new TezDAGID(
-        dagId.getApplicationId(), 1000);
-    DAGPlan dPlan = createInvalidDAGPlan();
-    TezVertexID vId = new TezVertexID(invalidDagId, 1);
-    VertexPlan vPlan = dPlan.getVertex(0);
-    VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
-        dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
-        clock, thh, appContext, vertexLocationHint);
-    v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
-    Assert.assertEquals(VertexState.FAILED, v.getState());
+    TezVertexID vId = null;
+    try {
+      TezDAGID invalidDagId = new TezDAGID(
+          dagId.getApplicationId(), 1000);
+      vId = new TezVertexID(invalidDagId, 1);
+      VertexPlan vPlan = invalidDagPlan.getVertex(0);
+      VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
+          dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
+          clock, thh, appContext, vertexLocationHint);
+      vertexIdMap.put(vId, v);
+      v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
+      dispatcher.await();
+      Assert.assertEquals(VertexState.INITED, v.getState());
+      v.handle(new VertexEvent(vId, VertexEventType.V_START));
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+    } finally {
+      if (vId != null) {
+        vertexIdMap.remove(vId);
+      }
+    }
   }
 
 }