You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/14 19:36:26 UTC
git commit: TEZ-930. Provide additional aggregated task stats at the
vertex level. (hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master 537d3bca4 -> 6a2585764
TEZ-930. Provide additional aggregated task stats at the vertex level. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6a258576
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6a258576
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6a258576
Branch: refs/heads/master
Commit: 6a2585764c64d0b70be34b59fab1c964a54588b4
Parents: 537d3bc
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Mar 14 11:36:04 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Mar 14 11:36:04 2014 -0700
----------------------------------------------------------------------
.../tez/dag/api/oldrecords/TaskReport.java | 27 +--
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 25 +--
.../tez/dag/app/dag/impl/TaskReportImpl.java | 98 +++++++++++
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 37 +++-
.../tez/dag/app/dag/impl/VertexStats.java | 169 +++++++++++++++++++
.../dag/history/events/VertexFinishedEvent.java | 16 +-
.../tez/dag/history/utils/ATSConstants.java | 1 +
.../apache/tez/dag/history/utils/DAGUtils.java | 52 ++++++
.../tez/dag/app/dag/impl/TestVertexStats.java | 159 +++++++++++++++++
.../TestHistoryEventsProtoConversion.java | 5 +-
10 files changed, 531 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
index 3a0685f..76bfc96 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskReport.java
@@ -30,34 +30,11 @@ public interface TaskReport {
public abstract float getProgress();
public abstract long getStartTime();
public abstract long getFinishTime();
- public abstract TezCounters getCounters();
-
- public abstract List<TezTaskAttemptID> getRunningAttemptsList();
- public abstract TezTaskAttemptID getRunningAttempt(int index);
- public abstract int getRunningAttemptsCount();
-
- public abstract TezTaskAttemptID getSuccessfulAttempt();
-
- public abstract List<String> getDiagnosticsList();
- public abstract String getDiagnostics(int index);
- public abstract int getDiagnosticsCount();
-
-
+
public abstract void setTaskId(TezTaskID taskId);
public abstract void setTaskState(TaskState taskState);
public abstract void setProgress(float progress);
public abstract void setStartTime(long startTime);
public abstract void setFinishTime(long finishTime);
- public abstract void setCounters(TezCounters counters);
-
- public abstract void addAllRunningAttempts(List<TezTaskAttemptID> taskAttempts);
- public abstract void addRunningAttempt(TezTaskAttemptID taskAttempt);
- public abstract void removeRunningAttempt(int index);
- public abstract void clearRunningAttempts();
-
- public abstract void setSuccessfulAttempt(TezTaskAttemptID taskAttempt);
- public abstract void addAllDiagnostics(List<String> diagnostics);
- public abstract void addDiagnostics(String diagnostics);
- public abstract void removeDiagnostics(int index);
- public abstract void clearDiagnostics();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 16c063a..f6b3faa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -408,9 +408,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public TaskReport getReport() {
- // TODO TEZPB This is broken. Records will not work without the PBImpl, which
- // is in a different package.
- TaskReport report = Records.newRecord(TaskReport.class);
+ TaskReport report = new TaskReportImpl();
readLock.lock();
try {
report.setTaskId(taskId);
@@ -418,27 +416,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
report.setFinishTime(getFinishTime());
report.setTaskState(getState());
report.setProgress(getProgress());
-
- for (TaskAttempt attempt : attempts.values()) {
- if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
- report.addRunningAttempt(attempt.getID());
- }
- }
-
- report.setSuccessfulAttempt(successfulAttempt);
-
- for (TaskAttempt att : attempts.values()) {
- String prefix = "AttemptID:" + att.getID() + " Info:";
- for (CharSequence cs : att.getDiagnostics()) {
- report.addDiagnostics(prefix + cs);
-
- }
- }
-
- // Add a copy of counters as the last step so that their lifetime on heap
- // is as small as possible.
- report.setCounters(getCounters());
-
return report;
} finally {
readLock.unlock();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java
new file mode 100644
index 0000000..1a2cee3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskReportImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.oldrecords.TaskReport;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+import java.util.List;
+
+public class TaskReportImpl implements TaskReport {
+
+ private TezTaskID taskID;
+ private TaskState taskState;
+ private float progress;
+ private long startTime;
+ private long finishTime;
+
+ public TaskReportImpl() {
+ }
+
+ public TaskReportImpl(TezTaskID taskID, TaskState taskState,
+ float progress, long startTime, long finishTime) {
+ this.taskID = taskID;
+ this.taskState = taskState;
+ this.progress = progress;
+ this.startTime = startTime;
+ this.finishTime = finishTime;
+ }
+
+ @Override
+ public TezTaskID getTaskId() {
+ return taskID;
+ }
+
+ @Override
+ public TaskState getTaskState() {
+ return taskState;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ @Override
+ public void setTaskId(TezTaskID taskId) {
+ this.taskID = taskId;
+ }
+
+ @Override
+ public void setTaskState(TaskState taskState) {
+ this.taskState = taskState;
+ }
+
+ @Override
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ @Override
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @Override
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7b3b6b4..d3294ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -536,6 +536,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Recovery related flags
boolean recoveryInitEventSeen = false;
boolean recoveryStartEventSeen = false;
+ private VertexStats vertexStats = null;
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, Configuration conf, EventHandler eventHandler,
@@ -719,6 +720,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ public VertexStats getVertexStats() {
+
+ readLock.lock();
+ try {
+ VertexState state = getInternalState();
+ if (state == VertexState.ERROR || state == VertexState.FAILED
+ || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
+ this.mayBeConstructFinalFullCounters();
+ return this.vertexStats;
+ }
+
+ VertexStats stats = new VertexStats();
+ return updateVertexStats(stats, tasks.values());
+
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+
public static TezCounters incrTaskCounters(
TezCounters counters, Collection<Task> tasks) {
for (Task task : tasks) {
@@ -727,6 +748,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return counters;
}
+ public static VertexStats updateVertexStats(
+ VertexStats stats, Collection<Task> tasks) {
+ for (Task task : tasks) {
+ stats.updateStats(task.getReport());
+ }
+ return stats;
+ }
+
+
@Override
public List<String> getDiagnostics() {
readLock.lock();
@@ -1235,7 +1265,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
startedTime, finishTime, VertexState.SUCCEEDED, "",
- getAllCounters());
+ getAllCounters(), getVertexStats());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
@@ -1244,7 +1274,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
- getDiagnostics()), getAllCounters());
+ getDiagnostics()), getAllCounters(), getVertexStats());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
@@ -2476,7 +2506,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Private
public void constructFinalFullcounters() {
this.fullCounters = new TezCounters();
+ this.vertexStats = new VertexStats();
+
for (Task t : this.tasks.values()) {
+ vertexStats.updateStats(t.getReport());
TezCounters counters = t.getCounters();
this.fullCounters.incrAllCounters(counters);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java
new file mode 100644
index 0000000..86d1797
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.tez.dag.api.oldrecords.TaskReport;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class VertexStats {
+
+ long firstTaskStartTime = -1;
+ Set<TezTaskID> firstTasksToStart = new HashSet<TezTaskID>();
+ long lastTaskFinishTime = -1;
+ Set<TezTaskID> lastTasksToFinish = new HashSet<TezTaskID>();
+
+ long minTaskDuration = -1;
+ long maxTaskDuration = -1;
+ double avgTaskDuration = -1;
+ long numSuccessfulTasks = 0;
+
+ Set<TezTaskID> shortestDurationTasks = new HashSet<TezTaskID>();
+ Set<TezTaskID> longestDurationTasks = new HashSet<TezTaskID>();
+
+ public long getFirstTaskStartTime() {
+ return firstTaskStartTime;
+ }
+
+ public Set<TezTaskID> getFirstTaskToStart() {
+ return Collections.unmodifiableSet(firstTasksToStart);
+ }
+
+ public long getLastTaskFinishTime() {
+ return lastTaskFinishTime;
+ }
+
+ public Set<TezTaskID> getLastTaskToFinish() {
+ return Collections.unmodifiableSet(lastTasksToFinish);
+ }
+
+ public long getMinTaskDuration() {
+ return minTaskDuration;
+ }
+
+ public long getMaxTaskDuration() {
+ return maxTaskDuration;
+ }
+
+ public double getAvgTaskDuration() {
+ return avgTaskDuration;
+ }
+
+ public Set<TezTaskID> getShortestDurationTask() {
+ return Collections.unmodifiableSet(shortestDurationTasks);
+ }
+
+ public Set<TezTaskID> getLongestDurationTask() {
+ return Collections.unmodifiableSet(longestDurationTasks);
+ }
+
+ void updateStats(TaskReport taskReport) {
+ if (firstTaskStartTime == -1
+ || firstTaskStartTime >= taskReport.getStartTime()) {
+ if (firstTaskStartTime != taskReport.getStartTime()) {
+ firstTasksToStart.clear();
+ }
+ firstTasksToStart.add(taskReport.getTaskId());
+ firstTaskStartTime = taskReport.getStartTime();
+ }
+ if ((taskReport.getFinishTime() > 0) &&
+ (lastTaskFinishTime == -1
+ || lastTaskFinishTime <= taskReport.getFinishTime())) {
+ if (lastTaskFinishTime != taskReport.getFinishTime()) {
+ lastTasksToFinish.clear();
+ }
+ lastTasksToFinish.add(taskReport.getTaskId());
+ lastTaskFinishTime = taskReport.getFinishTime();
+ }
+
+ if (!taskReport.getTaskState().equals(
+ TaskState.SUCCEEDED)) {
+ // ignore non-successful tasks when calculating durations
+ return;
+ }
+
+ long taskDuration = taskReport.getFinishTime() -
+ taskReport.getStartTime();
+ if (taskDuration < 0) {
+ return;
+ }
+
+ ++numSuccessfulTasks;
+ if (minTaskDuration == -1
+ || minTaskDuration >= taskDuration) {
+ if (minTaskDuration != taskDuration) {
+ shortestDurationTasks.clear();
+ }
+ minTaskDuration = taskDuration;
+ shortestDurationTasks.add(taskReport.getTaskId());
+ }
+ if (maxTaskDuration == -1
+ || maxTaskDuration <= taskDuration) {
+ if (maxTaskDuration != taskDuration) {
+ longestDurationTasks.clear();
+ }
+ maxTaskDuration = taskDuration;
+ longestDurationTasks.add(taskReport.getTaskId());
+ }
+
+ avgTaskDuration = ((avgTaskDuration * (numSuccessfulTasks-1)) + taskDuration)
+ /numSuccessfulTasks;
+ }
+
+ private void appendTaskIdSet(StringBuilder sb,
+ Set<TezTaskID> taskIDs) {
+ sb.append("[ ");
+ boolean first = true;
+ if (taskIDs != null) {
+ for (TezTaskID tezTaskID : taskIDs) {
+ if (!first) {
+ sb.append(",");
+ } else {
+ first = false;
+ }
+ sb.append(tezTaskID.toString());
+ }
+ }
+ sb.append(" ]");
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("firstTaskStartTime=").append(firstTaskStartTime)
+ .append(", firstTasksToStart=");
+ appendTaskIdSet(sb, firstTasksToStart);
+ sb.append(", lastTaskFinishTime=").append(lastTaskFinishTime)
+ .append(", lastTasksToFinish=");
+ appendTaskIdSet(sb, lastTasksToFinish);
+ sb.append(", minTaskDuration=").append(minTaskDuration)
+ .append(", maxTaskDuration=").append(maxTaskDuration)
+ .append(", avgTaskDuration=").append(avgTaskDuration)
+ .append(", numSuccessfulTasks=").append(numSuccessfulTasks)
+ .append(", shortestDurationTasks=");
+ appendTaskIdSet(sb, shortestDurationTasks);
+ sb.append(", longestDurationTasks=");
+ appendTaskIdSet(sb, longestDurationTasks);
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 6f07c91..8321a38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
@@ -53,11 +54,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
private String diagnostics;
private TezCounters tezCounters;
private boolean fromSummary = false;
+ private VertexStats vertexStats;
public VertexFinishedEvent(TezVertexID vertexId,
- String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
- VertexState state, String diagnostics,
- TezCounters counters) {
+ String vertexName, long initRequestedTime, long initedTime,
+ long startRequestedTime, long startedTime, long finishTime,
+ VertexState state, String diagnostics, TezCounters counters,
+ VertexStats vertexStats) {
this.vertexName = vertexName;
this.vertexID = vertexId;
this.initRequestedTime = initRequestedTime;
@@ -67,7 +70,8 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
this.finishTime = finishTime;
this.state = state;
this.diagnostics = diagnostics;
- tezCounters = counters;
+ this.tezCounters = counters;
+ this.vertexStats = vertexStats;
}
public VertexFinishedEvent() {
@@ -169,7 +173,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
+ ", diagnostics=" + diagnostics
+ ", counters=" + ( tezCounters == null ? "null" :
tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "))
+ + ", vertexStats=" + (vertexStats == null ? "null"
+ : vertexStats.toString());
}
public TezVertexID getVertexID() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index 8f680ce..4050df3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -57,6 +57,7 @@ public class ATSConstants {
public static final String STATUS = "status";
public static final String DIAGNOSTICS = "diagnostics";
public static final String COUNTERS = "counters";
+ public static final String STATS = "stats";
public static final String NUM_TASKS = "numTasks";
public static final String PROCESSOR_CLASS_NAME = "processorClassName";
public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 782fff1..b8ea673 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,10 +18,14 @@
package org.apache.tez.dag.history.utils;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -149,4 +153,52 @@ public class DAGUtils {
return jsonObject;
}
+ public static Map<String,Object> convertVertexStatsToATSMap(
+ VertexStats vertexStats) {
+ Map<String,Object> vertexStatsMap = new LinkedHashMap<String, Object>();
+ if (vertexStats == null) {
+ return vertexStatsMap;
+ }
+
+ final String FIRST_TASK_START_TIME_KEY = "firstTaskStartTime";
+ final String FIRST_TASK_TO_START_KEY = "firstTaskToStart";
+ final String LAST_TASK_FINISH_TIME_KEY = "lastTaskFinishTime";
+ final String LAST_TASK_TO_FINISH_KEY = "lastTaskToFinish";
+
+ final String MIN_TASK_DURATION = "minTaskDuration";
+ final String MAX_TASK_DURATION = "maxTaskDuration";
+ final String AVG_TASK_DURATION = "avgTaskDuration";
+
+ final String SHORTEST_DURATION_TASK = "shortestDurationTask";
+ final String LONGEST_DURATION_TASK = "longestDurationTask";
+
+ vertexStatsMap.put(FIRST_TASK_START_TIME_KEY, vertexStats.getFirstTaskStartTime());
+ if (vertexStats.getFirstTaskToStart() != null) {
+ vertexStatsMap.put(FIRST_TASK_TO_START_KEY,
+ vertexStats.getFirstTaskToStart().toString());
+ }
+ vertexStatsMap.put(LAST_TASK_FINISH_TIME_KEY, vertexStats.getLastTaskFinishTime());
+ if (vertexStats.getLastTaskToFinish() != null) {
+ vertexStatsMap.put(LAST_TASK_TO_FINISH_KEY,
+ vertexStats.getLastTaskToFinish().toString());
+ }
+
+ vertexStatsMap.put(MIN_TASK_DURATION, vertexStats.getMinTaskDuration());
+ vertexStatsMap.put(MAX_TASK_DURATION, vertexStats.getMaxTaskDuration());
+ vertexStatsMap.put(AVG_TASK_DURATION, vertexStats.getAvgTaskDuration());
+
+ if (vertexStats.getShortestDurationTask() != null) {
+ vertexStatsMap.put(SHORTEST_DURATION_TASK,
+ vertexStats.getShortestDurationTask().toString());
+ }
+ if (vertexStats.getLongestDurationTask() != null) {
+ vertexStatsMap.put(LONGEST_DURATION_TASK,
+ vertexStats.getLongestDurationTask().toString());
+ }
+
+ return vertexStatsMap;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java
new file mode 100644
index 0000000..47b6366
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexStats.java
@@ -0,0 +1,159 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestVertexStats {
+
+ @Test
+ public void testBasicStats() {
+ VertexStats stats = new VertexStats();
+ Assert.assertEquals(-1, stats.firstTaskStartTime);
+ Assert.assertEquals(-1, stats.lastTaskFinishTime);
+ Assert.assertEquals(-1, stats.minTaskDuration);
+ Assert.assertEquals(-1, stats.maxTaskDuration);
+ Assert.assertTrue(-1 == stats.avgTaskDuration);
+ Assert.assertEquals(0, stats.firstTasksToStart.size());
+ Assert.assertEquals(0, stats.lastTasksToFinish.size());
+ Assert.assertEquals(0, stats.shortestDurationTasks.size());
+ Assert.assertEquals(0, stats.longestDurationTasks.size());
+
+ TezVertexID tezVertexID = TezVertexID.getInstance(
+ TezDAGID.getInstance(
+ ApplicationId.newInstance(100l, 1), 1), 1);
+ TezTaskID tezTaskID1 = TezTaskID.getInstance(tezVertexID, 1);
+ TezTaskID tezTaskID2 = TezTaskID.getInstance(tezVertexID, 2);
+ TezTaskID tezTaskID3 = TezTaskID.getInstance(tezVertexID, 3);
+ TezTaskID tezTaskID4 = TezTaskID.getInstance(tezVertexID, 4);
+ TezTaskID tezTaskID5 = TezTaskID.getInstance(tezVertexID, 5);
+ TezTaskID tezTaskID6 = TezTaskID.getInstance(tezVertexID, 6);
+
+ stats.updateStats(new TaskReportImpl(tezTaskID1,
+ TaskState.SUCCEEDED, 1, 100, 200));
+ Assert.assertEquals(100, stats.firstTaskStartTime);
+ Assert.assertEquals(200, stats.lastTaskFinishTime);
+ Assert.assertEquals(100, stats.minTaskDuration);
+ Assert.assertEquals(100, stats.maxTaskDuration);
+ Assert.assertTrue(100 == stats.avgTaskDuration);
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID1));
+ Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID1));
+ Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID1));
+ Assert.assertEquals(1, stats.firstTasksToStart.size());
+ Assert.assertEquals(1, stats.lastTasksToFinish.size());
+ Assert.assertEquals(1, stats.shortestDurationTasks.size());
+ Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+ stats.updateStats(new TaskReportImpl(tezTaskID2,
+ TaskState.FAILED, 1, 150, 300));
+ Assert.assertEquals(100, stats.firstTaskStartTime);
+ Assert.assertEquals(300, stats.lastTaskFinishTime);
+ Assert.assertEquals(100, stats.minTaskDuration);
+ Assert.assertEquals(100, stats.maxTaskDuration);
+ Assert.assertTrue(100 == stats.avgTaskDuration);
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID1));
+ Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID2));
+ Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID1));
+ Assert.assertEquals(1, stats.firstTasksToStart.size());
+ Assert.assertEquals(1, stats.lastTasksToFinish.size());
+ Assert.assertEquals(1, stats.shortestDurationTasks.size());
+ Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+ stats.updateStats(new TaskReportImpl(tezTaskID3,
+ TaskState.RUNNING, 1, 50, 550));
+ Assert.assertEquals(50, stats.firstTaskStartTime);
+ Assert.assertEquals(550, stats.lastTaskFinishTime);
+ Assert.assertEquals(100, stats.minTaskDuration);
+ Assert.assertEquals(100, stats.maxTaskDuration);
+ Assert.assertTrue(100 == stats.avgTaskDuration);
+ Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID1));
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+ Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+ Assert.assertEquals(1, stats.firstTasksToStart.size());
+ Assert.assertEquals(1, stats.lastTasksToFinish.size());
+ Assert.assertEquals(1, stats.shortestDurationTasks.size());
+ Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+ stats.updateStats(new TaskReportImpl(tezTaskID4,
+ TaskState.SUCCEEDED, 1, 50, 450));
+ Assert.assertEquals(50, stats.firstTaskStartTime);
+ Assert.assertEquals(550, stats.lastTaskFinishTime);
+ Assert.assertEquals(100, stats.minTaskDuration);
+ Assert.assertEquals(400, stats.maxTaskDuration);
+ Assert.assertTrue(250 == stats.avgTaskDuration);
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID4));
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+ Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+ Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID4));
+ Assert.assertEquals(2, stats.firstTasksToStart.size());
+ Assert.assertEquals(1, stats.lastTasksToFinish.size());
+ Assert.assertEquals(1, stats.shortestDurationTasks.size());
+ Assert.assertEquals(1, stats.longestDurationTasks.size());
+
+ stats.updateStats(new TaskReportImpl(tezTaskID5,
+ TaskState.SUCCEEDED, 1, 50, 450));
+ Assert.assertEquals(50, stats.firstTaskStartTime);
+ Assert.assertEquals(550, stats.lastTaskFinishTime);
+ Assert.assertEquals(100, stats.minTaskDuration);
+ Assert.assertEquals(400, stats.maxTaskDuration);
+ Assert.assertTrue(300 == stats.avgTaskDuration);
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID5));
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID4));
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+ Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+ Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID4));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID5));
+ Assert.assertEquals(3, stats.firstTasksToStart.size());
+ Assert.assertEquals(1, stats.lastTasksToFinish.size());
+ Assert.assertEquals(1, stats.shortestDurationTasks.size());
+ Assert.assertEquals(2, stats.longestDurationTasks.size());
+
+ stats.updateStats(new TaskReportImpl(tezTaskID6,
+ TaskState.SUCCEEDED, 1, 450, 550));
+ Assert.assertEquals(50, stats.firstTaskStartTime);
+ Assert.assertEquals(550, stats.lastTaskFinishTime);
+ Assert.assertEquals(100, stats.minTaskDuration);
+ Assert.assertEquals(400, stats.maxTaskDuration);
+ Assert.assertTrue(250 == stats.avgTaskDuration);
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID5));
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID4));
+ Assert.assertTrue(stats.firstTasksToStart.contains(tezTaskID3));
+ Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID3));
+ Assert.assertTrue(stats.lastTasksToFinish.contains(tezTaskID6));
+ Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID1));
+ Assert.assertTrue(stats.shortestDurationTasks.contains(tezTaskID6));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID4));
+ Assert.assertTrue(stats.longestDurationTasks.contains(tezTaskID5));
+ Assert.assertEquals(3, stats.firstTasksToStart.size());
+ Assert.assertEquals(2, stats.lastTasksToFinish.size());
+ Assert.assertEquals(2, stats.shortestDurationTasks.size());
+ Assert.assertEquals(2, stats.longestDurationTasks.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6a258576/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 2cf3eaf..b9cae70 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -33,6 +33,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
@@ -312,7 +313,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
- null, null);
+ null, null, null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -328,7 +329,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
- "diagnose", new TezCounters());
+ "diagnose", new TezCounters(), new VertexStats());
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());