You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:11 UTC

[17/24] tez git commit: TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime) (rbalamohan)

TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime) (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7e3d5461
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7e3d5461
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7e3d5461

Branch: refs/heads/TEZ-2980
Commit: 7e3d5461c3b948ca1c27f386e3e9e3665b8a649e
Parents: 6f57630
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Feb 12 09:59:25 2016 -0800
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Feb 12 09:59:25 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 12 ++++++
 .../tez/history/parser/datamodel/DagInfo.java   | 33 ++++++++++++++-
 .../parser/datamodel/TaskAttemptInfo.java       | 37 +++++++++++++++-
 .../tez/history/parser/datamodel/TaskInfo.java  | 37 +++++++++++++++-
 .../history/parser/datamodel/VertexInfo.java    | 44 ++++++++++++++++++--
 .../apache/tez/history/TestHistoryParser.java   |  6 +++
 7 files changed, 161 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 61aaaa7..8cb7505 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
   TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier
   TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf.
   TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin.

http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index dd6f834..88dfe27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1232,6 +1232,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     if (recoveryData == null
         || recoveryData.getDAGFinishedEvent() == null) {
       Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+      if (finishTime < startTime) {
+        LOG.warn("DAG finish time is smaller than start time. "
+            + "startTime=" + startTime
+            + ", finishTime=" + finishTime
+        );
+      }
       DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
           finishTime, DAGState.SUCCEEDED, "", counters,
           this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId(),
@@ -1245,6 +1251,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     if (recoveryData == null
         || recoveryData.getDAGFinishedEvent() == null) {
       Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+      if (finishTime < startTime) {
+        LOG.warn("DAG finish time is smaller than start time. "
+            + "startTime=" + startTime
+            + ", finishTime=" + finishTime
+        );
+      }
       DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
           finishTime, state,
           StringUtils.join(getDiagnostics(), LINE_SEPARATOR),

http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
index 5fb760c..8057be7 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -98,8 +99,36 @@ public class DagInfo extends BaseInfo {
 
     //Parse additional Info
     JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
-    startTime = otherInfoNode.optLong(Constants.START_TIME);
-    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+    long sTime = otherInfoNode.optLong(Constants.START_TIME);
+    long eTime= otherInfoNode.optLong(Constants.FINISH_TIME);
+    if (eTime < sTime) {
+      LOG.warn("DAG has got wrong start/end values. "
+          + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+          + "timestamps in DAG started/finished events");
+
+      // Check if events DAG_STARTED, DAG_FINISHED can be made use of
+      for(Event event : eventList) {
+        switch (HistoryEventType.valueOf(event.getType())) {
+        case DAG_STARTED:
+          sTime = event.getAbsoluteTime();
+          break;
+        case DAG_FINISHED:
+          eTime = event.getAbsoluteTime();
+          break;
+        default:
+          break;
+        }
+      }
+
+      if (eTime < sTime) {
+        LOG.warn("DAG has got wrong start/end values in events as well. "
+            + "startTime=" + sTime + ", endTime=" + eTime);
+      }
+    }
+    startTime = sTime;
+    endTime = eTime;
+
     //TODO: Not getting populated correctly for lots of jobs.  Verify
     submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
     diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);

http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index d373513..885d743 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -24,12 +24,15 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.history.parser.utils.Utils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceAudience.Public;
 @Evolving
 public class TaskAttemptInfo extends BaseInfo {
 
+  private static final Log LOG = LogFactory.getLog(TaskAttemptInfo.class);
+
   private static final String SUCCEEDED = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
 
   private final String taskAttemptId;
@@ -95,8 +100,36 @@ public class TaskAttemptInfo extends BaseInfo {
 
     //Parse additional Info
     final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
-    startTime = otherInfoNode.optLong(Constants.START_TIME);
-    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+    long sTime = otherInfoNode.optLong(Constants.START_TIME);
+    long eTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    if (eTime < sTime) {
+      LOG.warn("TaskAttemptInfo has got wrong start/end values. "
+          + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+          + "timestamps in DAG started/finished events");
+
+      // Check if events TASK_STARTED, TASK_FINISHED can be made use of
+      for(Event event : eventList) {
+        switch (HistoryEventType.valueOf(event.getType())) {
+        case TASK_ATTEMPT_STARTED:
+          sTime = event.getAbsoluteTime();
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          eTime = event.getAbsoluteTime();
+          break;
+        default:
+          break;
+        }
+      }
+
+      if (eTime < sTime) {
+        LOG.warn("TaskAttemptInfo has got wrong start/end values in events as well. "
+            + "startTime=" + sTime + ", endTime=" + eTime);
+      }
+    }
+    startTime = sTime;
+    endTime = eTime;
+
     diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
     creationTime = otherInfoNode.optLong(Constants.CREATION_TIME);
     creationCausalTA = StringInterner.weakIntern(

http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
index c6f89d6..fb3f232 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -29,8 +29,11 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceStability.Evolving;
 @Evolving
 public class TaskInfo extends BaseInfo {
 
+  private static final Log LOG = LogFactory.getLog(TaskInfo.class);
+
   private final long startTime;
   private final long endTime;
   private final String diagnostics;
@@ -70,8 +75,36 @@ public class TaskInfo extends BaseInfo {
 
     //Parse additional Info
     final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
-    startTime = otherInfoNode.optLong(Constants.START_TIME);
-    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+    long sTime = otherInfoNode.optLong(Constants.START_TIME);
+    long eTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    if (eTime < sTime) {
+      LOG.warn("Task has got wrong start/end values. "
+          + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+          + "timestamps in DAG started/finished events");
+
+      // Check if events TASK_STARTED, TASK_FINISHED can be made use of
+      for(Event event : eventList) {
+        switch (HistoryEventType.valueOf(event.getType())) {
+        case TASK_STARTED:
+          sTime = event.getAbsoluteTime();
+          break;
+        case TASK_FINISHED:
+          eTime = event.getAbsoluteTime();
+          break;
+        default:
+          break;
+        }
+      }
+
+      if (eTime < sTime) {
+        LOG.warn("Task has got wrong start/end values in events as well. "
+            + "startTime=" + sTime + ", endTime=" + eTime);
+      }
+    }
+    startTime = sTime;
+    endTime = eTime;
+
     diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
     successfulAttemptId = StringInterner.weakIntern(
         otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID));

http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index 50647fe..0f6831b 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -28,8 +28,11 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEventType;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceStability.Evolving;
 @Evolving
 public class VertexInfo extends BaseInfo {
 
+  private static final Log LOG = LogFactory.getLog(VertexInfo.class);
+
   private final String vertexId;
   private final String vertexName;
   private final long finishTime;
@@ -98,9 +103,42 @@ public class VertexInfo extends BaseInfo {
     JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
     initRequestedTime = otherInfoNode.optLong(Constants.INIT_REQUESTED_TIME);
     startRequestedTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
-    startTime = otherInfoNode.optLong(Constants.START_TIME);
-    initTime = otherInfoNode.optLong(Constants.INIT_TIME);
-    finishTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+    long sTime = otherInfoNode.optLong(Constants.START_TIME);
+    long iTime = otherInfoNode.optLong(Constants.INIT_TIME);
+    long eTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    if (eTime < sTime) {
+      LOG.warn("Vertex has got wrong start/end values. "
+          + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+          + "timestamps in DAG started/finished events");
+
+      // Check if events VERTEX_STARTED, VERTEX_FINISHED can be made use of
+      for(Event event : eventList) {
+        switch (HistoryEventType.valueOf(event.getType())) {
+        case VERTEX_INITIALIZED:
+          iTime = event.getAbsoluteTime();
+          break;
+        case VERTEX_STARTED:
+          sTime = event.getAbsoluteTime();
+          break;
+        case VERTEX_FINISHED:
+          eTime = event.getAbsoluteTime();
+          break;
+        default:
+          break;
+        }
+      }
+
+      if (eTime < sTime) {
+        LOG.warn("Vertex has got wrong start/end values in events as well. "
+            + "startTime=" + sTime + ", endTime=" + eTime);
+      }
+    }
+    startTime = sTime;
+    finishTime = eTime;
+    initTime = iTime;
+
+
     diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
     numTasks = otherInfoNode.optInt(Constants.NUM_TASKS);
     failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);

http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index b373f6e..372585b 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -249,6 +249,7 @@ public class TestHistoryParser {
         WordCount.TokenProcessor.class.getName()));
     assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
         .equals(WordCount.SumProcessor.class.getName()));
+    assertTrue(dagInfo.getFinishTime() > dagInfo.getStartTime());
     assertTrue(dagInfo.getEdges().size() == 1);
     EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
     assertTrue(edgeInfo.getDataMovementType().
@@ -269,6 +270,7 @@ public class TestHistoryParser {
       assertTrue(vertexInfo.getStartRequestedTime() > 0);
       assertTrue(vertexInfo.getStartTime() > 0);
       assertTrue(vertexInfo.getFinishTime() > 0);
+      assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime());
       long finishTime = 0;
       for (TaskInfo taskInfo : vertexInfo.getTasks()) {
         assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
@@ -280,6 +282,7 @@ public class TestHistoryParser {
         assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
         assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
         assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+        assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime());
         List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
         if (vertexInfo.getVertexName().equals(TOKENIZER)) {
           // get the last task to finish and track its successful attempt
@@ -304,6 +307,7 @@ public class TestHistoryParser {
           assertTrue(attemptInfo.getCreationTime() > 0);
           assertTrue(attemptInfo.getAllocationTime() > 0);
           assertTrue(attemptInfo.getStartTime() > 0);
+          assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
         }
       }
       assertTrue(vertexInfo.getLastTaskToFinish() != null);
@@ -748,6 +752,7 @@ public class TestHistoryParser {
       assertTrue(vertexInfo.getFirstTaskToStart() != null);
       assertTrue(vertexInfo.getSucceededTasksCount() > 0);
       assertTrue(vertexInfo.getTasks().size() > 0);
+      assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime());
     }
 
     for (TaskInfo taskInfo : vertexInfo.getTasks()) {
@@ -781,6 +786,7 @@ public class TestHistoryParser {
           taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
       assertTrue(taskInfo.getSuccessfulAttemptId() != null);
       assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
+      assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime());
     }
     assertTrue(taskInfo.getTaskId() != null);