You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:11 UTC
[17/24] tez git commit: TEZ-3107. tez-tools: Log warn msgs in case
ATS has wrong values (e.g startTime > finishTime) (rbalamohan)
TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime) (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7e3d5461
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7e3d5461
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7e3d5461
Branch: refs/heads/TEZ-2980
Commit: 7e3d5461c3b948ca1c27f386e3e9e3665b8a649e
Parents: 6f57630
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Feb 12 09:59:25 2016 -0800
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Feb 12 09:59:25 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 12 ++++++
.../tez/history/parser/datamodel/DagInfo.java | 33 ++++++++++++++-
.../parser/datamodel/TaskAttemptInfo.java | 37 +++++++++++++++-
.../tez/history/parser/datamodel/TaskInfo.java | 37 +++++++++++++++-
.../history/parser/datamodel/VertexInfo.java | 44 ++++++++++++++++++--
.../apache/tez/history/TestHistoryParser.java | 6 +++
7 files changed, 161 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 61aaaa7..8cb7505 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier
TEZ-3090. MRInput should make dagIdentifier, vertexIdentifier, etc available to the InputFormat jobConf.
TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin.
http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index dd6f834..88dfe27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1232,6 +1232,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (recoveryData == null
|| recoveryData.getDAGFinishedEvent() == null) {
Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+ if (finishTime < startTime) {
+ LOG.warn("DAG finish time is smaller than start time. "
+ + "startTime=" + startTime
+ + ", finishTime=" + finishTime
+ );
+ }
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
finishTime, DAGState.SUCCEEDED, "", counters,
this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId(),
@@ -1245,6 +1251,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (recoveryData == null
|| recoveryData.getDAGFinishedEvent() == null) {
Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+ if (finishTime < startTime) {
+ LOG.warn("DAG finish time is smaller than start time. "
+ + "startTime=" + startTime
+ + ", finishTime=" + finishTime
+ );
+ }
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
finishTime, state,
StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
index 5fb760c..8057be7 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.history.HistoryEventType;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -98,8 +99,36 @@ public class DagInfo extends BaseInfo {
//Parse additional Info
JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
- startTime = otherInfoNode.optLong(Constants.START_TIME);
- endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+ long sTime = otherInfoNode.optLong(Constants.START_TIME);
+ long eTime= otherInfoNode.optLong(Constants.FINISH_TIME);
+ if (eTime < sTime) {
+ LOG.warn("DAG has got wrong start/end values. "
+ + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+ + "timestamps in DAG started/finished events");
+
+ // Check if events DAG_STARTED, DAG_FINISHED can be made use of
+ for(Event event : eventList) {
+ switch (HistoryEventType.valueOf(event.getType())) {
+ case DAG_STARTED:
+ sTime = event.getAbsoluteTime();
+ break;
+ case DAG_FINISHED:
+ eTime = event.getAbsoluteTime();
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (eTime < sTime) {
+ LOG.warn("DAG has got wrong start/end values in events as well. "
+ + "startTime=" + sTime + ", endTime=" + eTime);
+ }
+ }
+ startTime = sTime;
+ endTime = eTime;
+
//TODO: Not getting populated correctly for lots of jobs. Verify
submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index d373513..885d743 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -24,12 +24,15 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.history.parser.utils.Utils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceAudience.Public;
@Evolving
public class TaskAttemptInfo extends BaseInfo {
+ private static final Log LOG = LogFactory.getLog(TaskAttemptInfo.class);
+
private static final String SUCCEEDED = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
private final String taskAttemptId;
@@ -95,8 +100,36 @@ public class TaskAttemptInfo extends BaseInfo {
//Parse additional Info
final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
- startTime = otherInfoNode.optLong(Constants.START_TIME);
- endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+ long sTime = otherInfoNode.optLong(Constants.START_TIME);
+ long eTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ if (eTime < sTime) {
+ LOG.warn("TaskAttemptInfo has got wrong start/end values. "
+ + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+ + "timestamps in DAG started/finished events");
+
+ // Check if events TASK_STARTED, TASK_FINISHED can be made use of
+ for(Event event : eventList) {
+ switch (HistoryEventType.valueOf(event.getType())) {
+ case TASK_ATTEMPT_STARTED:
+ sTime = event.getAbsoluteTime();
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ eTime = event.getAbsoluteTime();
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (eTime < sTime) {
+ LOG.warn("TaskAttemptInfo has got wrong start/end values in events as well. "
+ + "startTime=" + sTime + ", endTime=" + eTime);
+ }
+ }
+ startTime = sTime;
+ endTime = eTime;
+
diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
creationTime = otherInfoNode.optLong(Constants.CREATION_TIME);
creationCausalTA = StringInterner.weakIntern(
http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
index c6f89d6..fb3f232 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -29,8 +29,11 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEventType;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceStability.Evolving;
@Evolving
public class TaskInfo extends BaseInfo {
+ private static final Log LOG = LogFactory.getLog(TaskInfo.class);
+
private final long startTime;
private final long endTime;
private final String diagnostics;
@@ -70,8 +75,36 @@ public class TaskInfo extends BaseInfo {
//Parse additional Info
final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
- startTime = otherInfoNode.optLong(Constants.START_TIME);
- endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+ long sTime = otherInfoNode.optLong(Constants.START_TIME);
+ long eTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ if (eTime < sTime) {
+ LOG.warn("Task has got wrong start/end values. "
+ + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+ + "timestamps in DAG started/finished events");
+
+ // Check if events TASK_STARTED, TASK_FINISHED can be made use of
+ for(Event event : eventList) {
+ switch (HistoryEventType.valueOf(event.getType())) {
+ case TASK_STARTED:
+ sTime = event.getAbsoluteTime();
+ break;
+ case TASK_FINISHED:
+ eTime = event.getAbsoluteTime();
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (eTime < sTime) {
+ LOG.warn("Task has got wrong start/end values in events as well. "
+ + "startTime=" + sTime + ", endTime=" + eTime);
+ }
+ }
+ startTime = sTime;
+ endTime = eTime;
+
diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
successfulAttemptId = StringInterner.weakIntern(
otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID));
http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index 50647fe..0f6831b 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -28,8 +28,11 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEventType;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceStability.Evolving;
@Evolving
public class VertexInfo extends BaseInfo {
+ private static final Log LOG = LogFactory.getLog(VertexInfo.class);
+
private final String vertexId;
private final String vertexName;
private final long finishTime;
@@ -98,9 +103,42 @@ public class VertexInfo extends BaseInfo {
JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
initRequestedTime = otherInfoNode.optLong(Constants.INIT_REQUESTED_TIME);
startRequestedTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
- startTime = otherInfoNode.optLong(Constants.START_TIME);
- initTime = otherInfoNode.optLong(Constants.INIT_TIME);
- finishTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+
+ long sTime = otherInfoNode.optLong(Constants.START_TIME);
+ long iTime = otherInfoNode.optLong(Constants.INIT_TIME);
+ long eTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ if (eTime < sTime) {
+ LOG.warn("Vertex has got wrong start/end values. "
+ + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+ + "timestamps in DAG started/finished events");
+
+ // Check if events VERTEX_STARTED, VERTEX_FINISHED can be made use of
+ for(Event event : eventList) {
+ switch (HistoryEventType.valueOf(event.getType())) {
+ case VERTEX_INITIALIZED:
+ iTime = event.getAbsoluteTime();
+ break;
+ case VERTEX_STARTED:
+ sTime = event.getAbsoluteTime();
+ break;
+ case VERTEX_FINISHED:
+ eTime = event.getAbsoluteTime();
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (eTime < sTime) {
+ LOG.warn("Vertex has got wrong start/end values in events as well. "
+ + "startTime=" + sTime + ", endTime=" + eTime);
+ }
+ }
+ startTime = sTime;
+ finishTime = eTime;
+ initTime = iTime;
+
+
diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
numTasks = otherInfoNode.optInt(Constants.NUM_TASKS);
failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
http://git-wip-us.apache.org/repos/asf/tez/blob/7e3d5461/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index b373f6e..372585b 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -249,6 +249,7 @@ public class TestHistoryParser {
WordCount.TokenProcessor.class.getName()));
assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
.equals(WordCount.SumProcessor.class.getName()));
+ assertTrue(dagInfo.getFinishTime() > dagInfo.getStartTime());
assertTrue(dagInfo.getEdges().size() == 1);
EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
assertTrue(edgeInfo.getDataMovementType().
@@ -269,6 +270,7 @@ public class TestHistoryParser {
assertTrue(vertexInfo.getStartRequestedTime() > 0);
assertTrue(vertexInfo.getStartTime() > 0);
assertTrue(vertexInfo.getFinishTime() > 0);
+ assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime());
long finishTime = 0;
for (TaskInfo taskInfo : vertexInfo.getTasks()) {
assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
@@ -280,6 +282,7 @@ public class TestHistoryParser {
assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+ assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime());
List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
if (vertexInfo.getVertexName().equals(TOKENIZER)) {
// get the last task to finish and track its successful attempt
@@ -304,6 +307,7 @@ public class TestHistoryParser {
assertTrue(attemptInfo.getCreationTime() > 0);
assertTrue(attemptInfo.getAllocationTime() > 0);
assertTrue(attemptInfo.getStartTime() > 0);
+ assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
}
}
assertTrue(vertexInfo.getLastTaskToFinish() != null);
@@ -748,6 +752,7 @@ public class TestHistoryParser {
assertTrue(vertexInfo.getFirstTaskToStart() != null);
assertTrue(vertexInfo.getSucceededTasksCount() > 0);
assertTrue(vertexInfo.getTasks().size() > 0);
+ assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime());
}
for (TaskInfo taskInfo : vertexInfo.getTasks()) {
@@ -781,6 +786,7 @@ public class TestHistoryParser {
taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval());
assertTrue(taskInfo.getSuccessfulAttemptId() != null);
assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
+ assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime());
}
assertTrue(taskInfo.getTaskId() != null);