You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/12/07 23:42:57 UTC
[4/5] tez git commit: TEZ-2973. Backport Analyzers to branch-0.7
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
new file mode 100644
index 0000000..5fb760c
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -0,0 +1,586 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import org.apache.commons.collections.BidiMap;
+import org.apache.commons.collections.bidimap.DualHashBidiMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.client.CallerContext;
+import org.apache.tez.dag.api.event.VertexState;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class DagInfo extends BaseInfo {
+
+ private static final Log LOG = LogFactory.getLog(DagInfo.class);
+
+ //Fields populated via JSON
+ private final String name;
+ private final long startTime;
+ private final long endTime;
+ private final long submitTime;
+ private final int failedTasks;
+ private final String dagId;
+ private final int numVertices;
+ private final String status;
+ private final String diagnostics;
+ private VersionInfo versionInfo;
+ private CallerContext callerContext;
+
+ //VertexID --> VertexName & vice versa
+ private final BidiMap vertexNameIDMapping;
+
+ //edgeId to EdgeInfo mapping
+ private final Map<Integer, EdgeInfo> edgeInfoMap;
+
+ //Only for internal parsing (vertexname mapping)
+ private Map<String, BasicVertexInfo> basicVertexInfoMap;
+
+ //VertexName --> VertexInfo
+ private Map<String, VertexInfo> vertexNameMap;
+
+ private Multimap<Container, TaskAttemptInfo> containerMapping;
+
+ DagInfo(JSONObject jsonObject) throws JSONException {
+ super(jsonObject);
+
+ vertexNameMap = Maps.newHashMap();
+ vertexNameIDMapping = new DualHashBidiMap();
+ edgeInfoMap = Maps.newHashMap();
+ basicVertexInfoMap = Maps.newHashMap();
+ containerMapping = LinkedHashMultimap.create();
+
+ Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+ (Constants.TEZ_DAG_ID));
+
+ dagId = StringInterner.weakIntern(jsonObject.getString(Constants.ENTITY));
+
+ //Parse additional Info
+ JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+ startTime = otherInfoNode.optLong(Constants.START_TIME);
+ endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ //TODO: Not getting populated correctly for lots of jobs. Verify
+ submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
+ diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+ failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
+ JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN);
+ name = StringInterner.weakIntern((dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null);
+ if (dagPlan != null) {
+ JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES);
+ if (vertices != null) {
+ numVertices = vertices.length();
+ } else {
+ numVertices = 0;
+ }
+ parseDAGPlan(dagPlan);
+ } else {
+ numVertices = 0;
+ }
+ status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+
+ //parse name id mapping
+ JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING);
+ if (vertexIDMappingJson != null) {
+ //get vertex name
+ for (Map.Entry<String, BasicVertexInfo> entry : basicVertexInfoMap.entrySet()) {
+ String vertexId = vertexIDMappingJson.optString(entry.getKey());
+ //vertexName --> vertexId
+ vertexNameIDMapping.put(entry.getKey(), vertexId);
+ }
+ }
+ }
+
+ public static DagInfo create(JSONObject jsonObject) throws JSONException {
+ DagInfo dagInfo = new DagInfo(jsonObject);
+ return dagInfo;
+ }
+
+ private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
+ int version = dagPlan.optInt(Constants.VERSION, 1);
+ parseEdges(dagPlan.optJSONArray(Constants.EDGES));
+
+ JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES);
+ parseBasicVertexInfo(verticesInfo);
+
+ if (version > 1) {
+ parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT));
+ }
+ }
+
+ private void parseDAGContext(JSONObject callerContextInfo) {
+ if (callerContextInfo == null) {
+ LOG.info("No DAG Caller Context available");
+ return;
+ }
+ String context = callerContextInfo.optString(Constants.CONTEXT);
+ String callerId = callerContextInfo.optString(Constants.CALLER_ID);
+ String callerType = callerContextInfo.optString(Constants.CALLER_TYPE);
+ String description = callerContextInfo.optString(Constants.DESCRIPTION);
+
+ this.callerContext = CallerContext.create(context, description);
+ if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) {
+ this.callerContext.setCallerIdAndType(callerId, callerType);
+ } else {
+ LOG.info("No DAG Caller Context Id and Type available");
+ }
+
+ }
+
+ private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException {
+ if (verticesInfo == null) {
+ LOG.info("No vertices available.");
+ return;
+ }
+
+ //Parse basic information available in DAG for vertex and edges
+ for (int i = 0; i < verticesInfo.length(); i++) {
+ BasicVertexInfo basicVertexInfo = new BasicVertexInfo();
+
+ JSONObject vJson = verticesInfo.getJSONObject(i);
+ basicVertexInfo.vertexName =
+ vJson.optString(Constants.VERTEX_NAME);
+ JSONArray inEdges = vJson.optJSONArray(Constants.IN_EDGE_IDS);
+ if (inEdges != null) {
+ String[] inEdgeIds = new String[inEdges.length()];
+ for (int j = 0; j < inEdges.length(); j++) {
+ inEdgeIds[j] = inEdges.get(j).toString();
+ }
+ basicVertexInfo.inEdgeIds = inEdgeIds;
+ }
+
+ JSONArray outEdges = vJson.optJSONArray(Constants.OUT_EDGE_IDS);
+ if (outEdges != null) {
+ String[] outEdgeIds = new String[outEdges.length()];
+ for (int j = 0; j < outEdges.length(); j++) {
+ outEdgeIds[j] = outEdges.get(j).toString();
+ }
+ basicVertexInfo.outEdgeIds = outEdgeIds;
+ }
+
+ JSONArray addInputsJson =
+ vJson.optJSONArray(Constants.ADDITIONAL_INPUTS);
+ basicVertexInfo.additionalInputs = parseAdditionalDetailsForVertex(addInputsJson);
+
+ JSONArray addOutputsJson =
+ vJson.optJSONArray(Constants.ADDITIONAL_OUTPUTS);
+ basicVertexInfo.additionalOutputs = parseAdditionalDetailsForVertex(addOutputsJson);
+
+ basicVertexInfoMap.put(basicVertexInfo.vertexName, basicVertexInfo);
+ }
+ }
+
+ /**
+ * get additional details available for every vertex in the dag
+ *
+ * @param jsonArray
+ * @return AdditionalInputOutputDetails[]
+ * @throws JSONException
+ */
+ private AdditionalInputOutputDetails[] parseAdditionalDetailsForVertex(JSONArray jsonArray) throws
+ JSONException {
+ if (jsonArray != null) {
+ AdditionalInputOutputDetails[]
+ additionalInputOutputDetails = new AdditionalInputOutputDetails[jsonArray.length()];
+ for (int j = 0; j < jsonArray.length(); j++) {
+ String name = jsonArray.getJSONObject(j).optString(
+ Constants.NAME);
+ String clazz = jsonArray.getJSONObject(j).optString(
+ Constants.CLASS);
+ String initializer =
+ jsonArray.getJSONObject(j).optString(Constants.INITIALIZER);
+ String userPayloadText = jsonArray.getJSONObject(j).optString(
+ Constants.USER_PAYLOAD_TEXT);
+
+ additionalInputOutputDetails[j] =
+ new AdditionalInputOutputDetails(name, clazz, initializer, userPayloadText);
+
+ }
+ return additionalInputOutputDetails;
+ }
+ return null;
+ }
+
+ /**
+ * Parse edge details in the DAG
+ *
+ * @param edgesArray
+ *
+ * @throws JSONException
+ */
+ private void parseEdges(JSONArray edgesArray) throws JSONException {
+ if (edgesArray == null) {
+ return;
+ }
+ for (int i = 0; i < edgesArray.length(); i++) {
+ JSONObject edge = edgesArray.getJSONObject(i);
+ Integer edgeId = edge.optInt(Constants.EDGE_ID);
+ String inputVertexName =
+ edge.optString(Constants.INPUT_VERTEX_NAME);
+ String outputVertexName =
+ edge.optString(Constants.OUTPUT_VERTEX_NAME);
+ String dataMovementType =
+ edge.optString(Constants.DATA_MOVEMENT_TYPE);
+ String edgeSourceClass =
+ edge.optString(Constants.EDGE_SOURCE_CLASS);
+ String edgeDestinationClass =
+ edge.optString(Constants.EDGE_DESTINATION_CLASS);
+ String inputUserPayloadAsText =
+ edge.optString(Constants.INPUT_PAYLOAD_TEXT);
+ String outputUserPayloadAsText =
+ edge.optString(Constants.OUTPUT_PAYLOAD_TEXT);
+ EdgeInfo edgeInfo = new EdgeInfo(inputVertexName, outputVertexName,
+ dataMovementType, edgeSourceClass, edgeDestinationClass, inputUserPayloadAsText,
+ outputUserPayloadAsText);
+ edgeInfoMap.put(edgeId, edgeInfo);
+ }
+ }
+
+ static class BasicVertexInfo {
+ String vertexName;
+ String[] inEdgeIds;
+ String[] outEdgeIds;
+ AdditionalInputOutputDetails[] additionalInputs;
+ AdditionalInputOutputDetails[] additionalOutputs;
+ }
+
+ void addVertexInfo(VertexInfo vertexInfo) {
+ BasicVertexInfo basicVertexInfo = basicVertexInfoMap.get(vertexInfo.getVertexName());
+
+ Preconditions.checkArgument(basicVertexInfo != null,
+ "VerteName " + vertexInfo.getVertexName()
+ + " not present in DAG's vertices " + basicVertexInfoMap.entrySet());
+
+ //populate additional information in VertexInfo
+ if (basicVertexInfo.additionalInputs != null) {
+ vertexInfo.setAdditionalInputInfoList(Arrays.asList(basicVertexInfo.additionalInputs));
+ }
+ if (basicVertexInfo.additionalOutputs != null) {
+ vertexInfo.setAdditionalOutputInfoList(Arrays.asList(basicVertexInfo.additionalOutputs));
+ }
+
+ //Populate edge information in vertex
+ if (basicVertexInfo.inEdgeIds != null) {
+ for (String edge : basicVertexInfo.inEdgeIds) {
+ EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+ Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+ vertexInfo.addInEdge(edgeInfo);
+ }
+ }
+
+ if (basicVertexInfo.outEdgeIds != null) {
+ for (String edge : basicVertexInfo.outEdgeIds) {
+ EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+ Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+ vertexInfo.addOutEdge(edgeInfo);
+ }
+ }
+
+ vertexNameMap.put(vertexInfo.getVertexName(), vertexInfo);
+ }
+
+ void setVersionInfo(VersionInfo versionInfo) {
+ this.versionInfo = versionInfo;
+ }
+
+ void addContainerMapping(Container container, TaskAttemptInfo taskAttemptInfo) {
+ this.containerMapping.put(container, taskAttemptInfo);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("dagID=").append(getDagId()).append(", ");
+ sb.append("dagName=").append(getName()).append(", ");
+ sb.append("status=").append(getStatus()).append(", ");
+ sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+ sb.append("submitTime=").append(getSubmitTime()).append(", ");
+ sb.append("endTime=").append(getFinishTimeInterval()).append(", ");
+ sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+ sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+ sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", ");
+ sb.append("failedTasks=").append(getFailedTaskCount()).append(", ");
+ sb.append("events=").append(getEvents()).append(", ");
+ sb.append("status=").append(getStatus());
+ sb.append("]");
+ return sb.toString();
+ }
+
+ public Multimap<Container, TaskAttemptInfo> getContainerMapping() {
+ return Multimaps.unmodifiableMultimap(containerMapping);
+ }
+
+ public final VersionInfo getVersionInfo() {
+ return versionInfo;
+ }
+
+ public final CallerContext getCallerContext() {
+ return callerContext;
+ }
+
+ public final String getName() {
+ return name;
+ }
+
+ public final Collection<EdgeInfo> getEdges() {
+ return Collections.unmodifiableCollection(edgeInfoMap.values());
+ }
+
+ public final long getSubmitTime() {
+ return submitTime;
+ }
+
+ public final long getStartTime() {
+ return startTime;
+ }
+
+ public final long getFinishTime() {
+ return endTime;
+ }
+
+ /**
+ * Reference start time for the DAG. Vertex, Task, TaskAttempt would map on to this.
+ * If absolute start time is needed, call getAbsStartTime().
+ *
+ * @return starting time w.r.t to dag
+ */
+ public final long getStartTimeInterval() {
+ return 0;
+ }
+
+ @Override
+ public final long getFinishTimeInterval() {
+ long dagEndTime = (endTime - startTime);
+ if (dagEndTime < 0) {
+ //probably dag is not complete or failed in middle. get the last task attempt time
+ for (VertexInfo vertexInfo : getVertices()) {
+ dagEndTime = (vertexInfo.getFinishTimeInterval() > dagEndTime) ? vertexInfo.getFinishTimeInterval() : dagEndTime;
+ }
+ }
+ return dagEndTime;
+ }
+
+ public final long getTimeTaken() {
+ return getFinishTimeInterval();
+ }
+
+ public final String getStatus() {
+ return status;
+ }
+
+ /**
+ * Get vertexInfo for a given vertexid
+ *
+ * @param vertexId
+ * @return VertexInfo
+ */
+ public VertexInfo getVertexFromId(String vertexId) {
+ return vertexNameMap.get(vertexNameIDMapping.getKey(vertexId));
+ }
+
+ /**
+ * Get vertexInfo for a given vertex name
+ *
+ * @param vertexName
+ * @return VertexInfo
+ */
+ public final VertexInfo getVertex(String vertexName) {
+ return vertexNameMap.get(vertexName);
+ }
+
+ public final String getDiagnostics() {
+ return diagnostics;
+ }
+
+ /**
+ * Get all vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getVertices() {
+ List<VertexInfo> vertices = Lists.newLinkedList(vertexNameMap.values());
+ Collections.sort(vertices, new Comparator<VertexInfo>() {
+
+ @Override public int compare(VertexInfo o1, VertexInfo o2) {
+ return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+ ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ?
+ 0 : 1);
+ }
+ });
+ return Collections.unmodifiableList(vertices);
+ }
+
+ /**
+ * Get list of failed vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getFailedVertices() {
+ return getVertices(VertexState.FAILED);
+ }
+
+ /**
+ * Get list of killed vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getKilledVertices() {
+ return getVertices(VertexState.KILLED);
+ }
+
+ /**
+ * Get list of failed vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getSuccessfullVertices() {
+ return getVertices(VertexState.SUCCEEDED);
+ }
+
+ /**
+ * Get list of vertices belonging to a specific state
+ *
+ * @param state
+ * @return Collection<VertexInfo>
+ */
+ public final List<VertexInfo> getVertices(final VertexState state) {
+ return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+ (vertexNameMap.values()), new Predicate<VertexInfo>() {
+ @Override public boolean apply(VertexInfo input) {
+ return input.getStatus() != null && input.getStatus().equals(state.toString());
+ }
+ }
+ )
+ )
+ );
+ }
+
+ public final Map<String, VertexInfo> getVertexMapping() {
+ return Collections.unmodifiableMap(vertexNameMap);
+ }
+
+ private Ordering<VertexInfo> getVertexOrdering() {
+ return Ordering.from(new Comparator<VertexInfo>() {
+ @Override public int compare(VertexInfo o1, VertexInfo o2) {
+ return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+ ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+ 0 : 1);
+ }
+ });
+ }
+
+ /**
+ * Get the slowest vertex in the DAG
+ *
+ * @return VertexInfo
+ */
+ public final VertexInfo getSlowestVertex() {
+ List<VertexInfo> vertexInfoList = getVertices();
+ if (vertexInfoList.size() == 0) {
+ return null;
+ }
+ return getVertexOrdering().max(vertexInfoList);
+ }
+
+ /**
+ * Get the slowest vertex in the DAG
+ *
+ * @return VertexInfo
+ */
+ public final VertexInfo getFastestVertex() {
+ List<VertexInfo> vertexInfoList = getVertices();
+ if (vertexInfoList.size() == 0) {
+ return null;
+ }
+ return getVertexOrdering().min(vertexInfoList);
+ }
+
+ /**
+ * Get node details for this DAG. Would be useful for analyzing node to tasks.
+ *
+ * @return Multimap<String, TaskAttemptInfo> taskAttempt details at every node
+ */
+ public final Multimap<String, TaskAttemptInfo> getNodeDetails() {
+ Multimap<String, TaskAttemptInfo> nodeDetails = LinkedListMultimap.create();
+ for (VertexInfo vertexInfo : getVertices()) {
+ Multimap<Container, TaskAttemptInfo> containerMapping = vertexInfo.getContainersMapping();
+ for (Map.Entry<Container, TaskAttemptInfo> entry : containerMapping.entries()) {
+ nodeDetails.put(entry.getKey().getHost(), entry.getValue());
+ }
+ }
+ return nodeDetails;
+ }
+
+ /**
+ * Get containers used for this DAG
+ *
+ * @return Multimap<Container, TaskAttemptInfo> task attempt details at every container
+ */
+ public final Multimap<Container, TaskAttemptInfo> getContainersToTaskAttemptMapping() {
+ List<VertexInfo> VertexInfoList = getVertices();
+ Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+
+ for (VertexInfo vertexInfo : VertexInfoList) {
+ containerMapping.putAll(vertexInfo.getContainersMapping());
+ }
+ return Multimaps.unmodifiableMultimap(containerMapping);
+ }
+
+ public final Map getVertexNameIDMapping() {
+ return vertexNameIDMapping;
+ }
+
+ public final int getNumVertices() {
+ return numVertices;
+ }
+
+ public final String getDagId() {
+ return dagId;
+ }
+
+ public final int getFailedTaskCount() {
+ return failedTasks;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
new file mode 100644
index 0000000..ab8e831
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class EdgeInfo {
+
+ private final String inputVertexName;
+ private final String outputVertexName;
+ private final String dataMovementType;
+ private final String edgeSourceClass;
+ private final String edgeDestinationClass;
+ private final String inputUserPayloadAsText;
+ private final String outputUserPayloadAsText;
+
+ private VertexInfo sourceVertex;
+ private VertexInfo destinationVertex;
+
+ public EdgeInfo(String inputVertexName, String outputVertexName, String dataMovementType,
+ String edgeSourceClass, String edgeDestinationClass, String inputUserPayloadAsText, String
+ outputUserPayloadAsText) {
+ this.inputVertexName = inputVertexName;
+ this.outputVertexName = outputVertexName;
+ this.dataMovementType = dataMovementType;
+ this.edgeSourceClass = edgeSourceClass;
+ this.edgeDestinationClass = edgeDestinationClass;
+ this.inputUserPayloadAsText = inputUserPayloadAsText;
+ this.outputUserPayloadAsText = outputUserPayloadAsText;
+ }
+
+ public final String getInputVertexName() {
+ return inputVertexName;
+ }
+
+ public final String getOutputVertexName() {
+ return outputVertexName;
+ }
+
+ public final String getDataMovementType() {
+ return dataMovementType;
+ }
+
+ public final String getEdgeSourceClass() {
+ return edgeSourceClass;
+ }
+
+ public final String getEdgeDestinationClass() {
+ return edgeDestinationClass;
+ }
+
+ public final String getInputUserPayloadAsText() {
+ return inputUserPayloadAsText;
+ }
+
+ public final String getOutputUserPayloadAsText() {
+ return outputUserPayloadAsText;
+ }
+
+ public final VertexInfo getSourceVertex() {
+ return sourceVertex;
+ }
+
+ public final void setSourceVertex(VertexInfo sourceVertex) {
+ this.sourceVertex = sourceVertex;
+ }
+
+ public final VertexInfo getDestinationVertex() {
+ return destinationVertex;
+ }
+
+ public final void setDestinationVertex(VertexInfo destinationVertex) {
+ this.destinationVertex = destinationVertex;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("inputVertexName=").append(inputVertexName).append(", ");
+ sb.append("outputVertexName=").append(outputVertexName).append(", ");
+ sb.append("dataMovementType=").append(dataMovementType).append(", ");
+ sb.append("edgeSourceClass=").append(edgeSourceClass).append(", ");
+ sb.append("edgeDestinationClass=").append(edgeDestinationClass).append(", ");
+ sb.append("inputUserPayloadAsText=").append(inputUserPayloadAsText).append(",");
+ sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText).append(", ");
+ sb.append("sourceVertex=").append(sourceVertex.getVertexName()).append(", ");
+ sb.append("destinationVertex=").append(destinationVertex.getVertexName()).append(", ");
+ sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText);
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
new file mode 100644
index 0000000..70310f3
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class Event {
+ private final String info;
+ private final String type;
+ private final long time;
+
+ private long refTime; //typically dag start time.
+
+ public Event(String info, String type, long time) {
+ this.time = time;
+ this.type = type;
+ this.info = info;
+ }
+
+ void setReferenceTime(long refTime) {
+ this.refTime = refTime;
+ }
+
+ public final String getInfo() {
+ return info;
+ }
+
+ public final String getType() {
+ return type;
+ }
+
+ public final long getAbsoluteTime() {
+ return time;
+ }
+
+ public final long getTime() {
+ return time - refTime;
+ }
+
+ @Override
+ public String toString() {
+ return "[info=" + info + ", type=" + type + ", time=" + time + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
new file mode 100644
index 0000000..d373513
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+
+@Public
+@Evolving
+public class TaskAttemptInfo extends BaseInfo {
+
+ private static final String SUCCEEDED = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name());
+
+ private final String taskAttemptId;
+ private final long startTime;
+ private final long endTime;
+ private final String diagnostics;
+
+ private final long creationTime;
+ private final long allocationTime;
+ private final String containerId;
+ private final String nodeId;
+ private final String status;
+ private final String logUrl;
+ private final String creationCausalTA;
+ private final String terminationCause;
+ private final long executionTimeInterval;
+ // this list is in time order - array list for easy walking
+ private final ArrayList<DataDependencyEvent> lastDataEvents = Lists.newArrayList();
+
+ private TaskInfo taskInfo;
+
+ private Container container;
+
+ public static class DataDependencyEvent {
+ String taId;
+ long timestamp;
+ public DataDependencyEvent(String id, long time) {
+ taId = id;
+ timestamp = time;
+ }
+ public long getTimestamp() {
+ return timestamp;
+ }
+ public String getTaskAttemptId() {
+ return taId;
+ }
+ }
+
+ TaskAttemptInfo(JSONObject jsonObject) throws JSONException {
+ super(jsonObject);
+
+ Preconditions.checkArgument(
+ jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+ (Constants.TEZ_TASK_ATTEMPT_ID));
+
+ taskAttemptId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
+
+ //Parse additional Info
+ final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+ startTime = otherInfoNode.optLong(Constants.START_TIME);
+ endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+ creationTime = otherInfoNode.optLong(Constants.CREATION_TIME);
+ creationCausalTA = StringInterner.weakIntern(
+ otherInfoNode.optString(Constants.CREATION_CAUSAL_ATTEMPT));
+ allocationTime = otherInfoNode.optLong(Constants.ALLOCATION_TIME);
+ containerId = StringInterner.weakIntern(otherInfoNode.optString(Constants.CONTAINER_ID));
+ String id = otherInfoNode.optString(Constants.NODE_ID);
+ nodeId = StringInterner.weakIntern((id != null) ? (id.split(":")[0]) : "");
+ logUrl = otherInfoNode.optString(Constants.COMPLETED_LOGS_URL);
+
+ status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+ container = new Container(containerId, nodeId);
+ if (otherInfoNode.has(Constants.LAST_DATA_EVENTS)) {
+ List<DataDependencyEvent> eventInfo = Utils.parseDataEventDependencyFromJSON(
+ otherInfoNode.optJSONObject(Constants.LAST_DATA_EVENTS));
+ long lastTime = 0;
+ for (DataDependencyEvent item : eventInfo) {
+ // check these are in time order
+ Preconditions.checkState(lastTime < item.getTimestamp());
+ lastTime = item.getTimestamp();
+ lastDataEvents.add(item);
+ }
+ }
+ terminationCause = StringInterner
+ .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
+ executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0;
+ }
+
+ public static Ordering<TaskAttemptInfo> orderingOnAllocationTime() {
+ return Ordering.from(new Comparator<TaskAttemptInfo>() {
+ @Override
+ public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+ return (o1.getAllocationTime() < o2.getAllocationTime() ? -1
+ : o1.getAllocationTime() > o2.getAllocationTime() ? 1 : 0);
+ }
+ });
+ }
+
+ void setTaskInfo(TaskInfo taskInfo) {
+ Preconditions.checkArgument(taskInfo != null, "Provide valid taskInfo");
+ this.taskInfo = taskInfo;
+ taskInfo.addTaskAttemptInfo(this);
+ }
+
+ @Override
+ public final long getStartTimeInterval() {
+ return startTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+ }
+
+ @Override
+ public final long getFinishTimeInterval() {
+ return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+ }
+
+ public final boolean isSucceeded() {
+ return status.equals(SUCCEEDED);
+ }
+
+ public final List<DataDependencyEvent> getLastDataEvents() {
+ return lastDataEvents;
+ }
+
+ public final long getExecutionTimeInterval() {
+ return executionTimeInterval;
+ }
+
+ public final long getPostDataExecutionTimeInterval() {
+ if (getStartTime() > 0 && getFinishTime() > 0) {
+ // start time defaults to the actual start time
+ long postDataStartTime = startTime;
+ if (getLastDataEvents() != null && !getLastDataEvents().isEmpty()) {
+ // if last data event is after the start time then use last data event time
+ long lastEventTime = getLastDataEvents().get(getLastDataEvents().size()-1).getTimestamp();
+ postDataStartTime = startTime > lastEventTime ? startTime : lastEventTime;
+ }
+ return (getFinishTime() - postDataStartTime);
+ }
+ return -1;
+ }
+
+ public final long getAllocationToEndTimeInterval() {
+ return (endTime - allocationTime);
+ }
+
+ public final long getAllocationToStartTimeInterval() {
+ return (startTime - allocationTime);
+ }
+
+ public final long getCreationToAllocationTimeInterval() {
+ return (allocationTime - creationTime);
+ }
+
+ public final long getStartTime() {
+ return startTime;
+ }
+
+ public final long getFinishTime() {
+ return endTime;
+ }
+
+ public final long getCreationTime() {
+ return creationTime;
+ }
+
+ public final DataDependencyEvent getLastDataEventInfo(long timeThreshold) {
+ for (int i=lastDataEvents.size()-1; i>=0; i--) {
+ // walk back in time until we get first event that happened before the threshold
+ DataDependencyEvent item = lastDataEvents.get(i);
+ if (item.getTimestamp() < timeThreshold) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ public final long getTimeTaken() {
+ return getFinishTimeInterval() - getStartTimeInterval();
+ }
+
+ public final long getCreationTimeInterval() {
+ return creationTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+ }
+
+ public final String getCreationCausalTA() {
+ return creationCausalTA;
+ }
+
+ public final long getAllocationTime() {
+ return allocationTime;
+ }
+
+ public final String getShortName() {
+ return getTaskInfo().getVertexInfo().getVertexName() + " : " +
+ taskAttemptId.substring(taskAttemptId.lastIndexOf('_', taskAttemptId.lastIndexOf('_') - 1) + 1);
+ }
+
+ @Override
+ public final String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public final String getTerminationCause() {
+ return terminationCause;
+ }
+
+ public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException {
+ return new TaskAttemptInfo(taskInfoObject);
+ }
+
+ public final boolean isLocalityInfoAvailable() {
+ Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
+ DAGCounter.DATA_LOCAL_TASKS.toString());
+ Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
+ DAGCounter.RACK_LOCAL_TASKS.toString());
+
+ Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
+ DAGCounter.OTHER_LOCAL_TASKS.toString());
+
+ if (!dataLocalTask.isEmpty() || !rackLocalTask.isEmpty() || !otherLocalTask.isEmpty()) {
+ return true;
+ }
+ return false;
+ }
+
+ public final String getDetailedStatus() {
+ if (!Strings.isNullOrEmpty(getTerminationCause())) {
+ return getStatus() + ":" + getTerminationCause();
+ }
+ return getStatus();
+ }
+
+ public final TezCounter getLocalityInfo() {
+ Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
+ DAGCounter.DATA_LOCAL_TASKS.toString());
+ Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
+ DAGCounter.RACK_LOCAL_TASKS.toString());
+ Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
+ DAGCounter.OTHER_LOCAL_TASKS.toString());
+
+ if (!dataLocalTask.isEmpty()) {
+ return dataLocalTask.get(DAGCounter.class.getName());
+ }
+
+ if (!rackLocalTask.isEmpty()) {
+ return rackLocalTask.get(DAGCounter.class.getName());
+ }
+
+ if (!otherLocalTask.isEmpty()) {
+ return otherLocalTask.get(DAGCounter.class.getName());
+ }
+ return null;
+ }
+
+ public final TaskInfo getTaskInfo() {
+ return taskInfo;
+ }
+
+ public final String getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ public final String getNodeId() {
+ return nodeId;
+ }
+
+ public final String getStatus() {
+ return status;
+ }
+
+ public final Container getContainer() {
+ return container;
+ }
+
+ public final String getLogURL() {
+ return logUrl;
+ }
+
+ /**
+ * Get merge counter per source. Available in case of reducer task
+ *
+ * @return Map<String, TezCounter> merge phase time at every counter group level
+ */
+ public final Map<String, TezCounter> getMergePhaseTime() {
+ return getCounter(null, TaskCounter.MERGE_PHASE_TIME.name());
+ }
+
+ /**
+ * Get shuffle counter per source. Available in case of shuffle
+ *
+ * @return Map<String, TezCounter> shuffle phase time at every counter group level
+ */
+ public final Map<String, TezCounter> getShufflePhaseTime() {
+ return getCounter(null, TaskCounter.SHUFFLE_PHASE_TIME.name());
+ }
+
+ /**
+ * Get OUTPUT_BYTES counter per source. Available in case of map outputs
+ *
+ * @return Map<String, TezCounter> output bytes counter at every counter group
+ */
+ public final Map<String, TezCounter> getTaskOutputBytes() {
+ return getCounter(null, TaskCounter.OUTPUT_BYTES.name());
+ }
+
+ /**
+ * Get number of spills per source. (SPILLED_RECORDS / OUTPUT_RECORDS)
+ *
+ * @return Map<String, Long> spill count details
+ */
+ public final Map<String, Float> getSpillCount() {
+ Map<String, TezCounter> outputRecords = getCounter(null, "OUTPUT_RECORDS");
+ Map<String, TezCounter> spilledRecords = getCounter(null, "SPILLED_RECORDS");
+ Map<String, Float> result = Maps.newHashMap();
+ for (Map.Entry<String, TezCounter> entry : spilledRecords.entrySet()) {
+ String source = entry.getKey();
+ long spilledVal = entry.getValue().getValue();
+ long outputVal = outputRecords.get(source).getValue();
+ result.put(source, (spilledVal * 1.0f) / (outputVal * 1.0f));
+ }
+ return result;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", ");
+ sb.append("creationTime=").append(getCreationTimeInterval()).append(", ");
+ sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+ sb.append("finishTime=").append(getFinishTimeInterval()).append(", ");
+ sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+ sb.append("events=").append(getEvents()).append(", ");
+ sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+ sb.append("container=").append(getContainer()).append(", ");
+ sb.append("nodeId=").append(getNodeId()).append(", ");
+ sb.append("logURL=").append(getLogURL()).append(", ");
+ sb.append("status=").append(getStatus());
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
new file mode 100644
index 0000000..c6f89d6
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class TaskInfo extends BaseInfo {
+
+ private final long startTime;
+ private final long endTime;
+ private final String diagnostics;
+ private final String successfulAttemptId;
+ private final long scheduledTime;
+ private final String status;
+ private final String taskId;
+
+ private VertexInfo vertexInfo;
+
+ private Map<String, TaskAttemptInfo> attemptInfoMap = Maps
+ .newHashMap();
+
+ TaskInfo(JSONObject jsonObject) throws JSONException {
+ super(jsonObject);
+
+ Preconditions.checkArgument(
+ jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+ (Constants.TEZ_TASK_ID));
+
+ taskId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
+
+ //Parse additional Info
+ final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+ startTime = otherInfoNode.optLong(Constants.START_TIME);
+ endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+ successfulAttemptId = StringInterner.weakIntern(
+ otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID));
+ scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME);
+ status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+ }
+
+ @Override
+ public final long getStartTimeInterval() {
+ return startTime - (vertexInfo.getDagInfo().getStartTime());
+ }
+
+ public final long getStartTime() {
+ return startTime;
+ }
+
+ public final long getFinishTime() {
+ return endTime;
+ }
+
+ @Override
+ public final long getFinishTimeInterval() {
+ long taskFinishTime = endTime - (vertexInfo.getDagInfo().getStartTime());
+ if (taskFinishTime < 0) {
+ //probably vertex is not complete or failed in middle. get the last task attempt time
+ for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+ taskFinishTime = (attemptInfo.getFinishTimeInterval() > taskFinishTime)
+ ? attemptInfo.getFinishTimeInterval() : taskFinishTime;
+ }
+ }
+ return taskFinishTime;
+ }
+
+ @Override
+ public final String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public static TaskInfo create(JSONObject taskInfoObject) throws
+ JSONException {
+ return new TaskInfo(taskInfoObject);
+ }
+
+ void addTaskAttemptInfo(TaskAttemptInfo taskAttemptInfo) {
+ attemptInfoMap.put(taskAttemptInfo.getTaskAttemptId(), taskAttemptInfo);
+ }
+
+ void setVertexInfo(VertexInfo vertexInfo) {
+ Preconditions.checkArgument(vertexInfo != null, "Provide valid vertexInfo");
+ this.vertexInfo = vertexInfo;
+ //link it to vertex
+ vertexInfo.addTaskInfo(this);
+ }
+
+ public final VertexInfo getVertexInfo() {
+ return vertexInfo;
+ }
+
+ /**
+ * Get all task attempts
+ *
+ * @return list of task attempt info
+ */
+ public final List<TaskAttemptInfo> getTaskAttempts() {
+ List<TaskAttemptInfo> attemptsList = Lists.newLinkedList(attemptInfoMap.values());
+ Collections.sort(attemptsList, orderingOnAttemptStartTime());
+ return Collections.unmodifiableList(attemptsList);
+ }
+
+ /**
+ * Get list of failed tasks
+ *
+ * @return List<TaskAttemptInfo>
+ */
+ public final List<TaskAttemptInfo> getFailedTaskAttempts() {
+ return getTaskAttempts(TaskAttemptState.FAILED);
+ }
+
+ /**
+ * Get list of killed tasks
+ *
+ * @return List<TaskAttemptInfo>
+ */
+ public final List<TaskAttemptInfo> getKilledTaskAttempts() {
+ return getTaskAttempts(TaskAttemptState.KILLED);
+ }
+
+ /**
+ * Get list of failed tasks
+ *
+ * @return List<TaskAttemptInfo>
+ */
+ public final List<TaskAttemptInfo> getSuccessfulTaskAttempts() {
+ return getTaskAttempts(TaskAttemptState.SUCCEEDED);
+ }
+
+ /**
+ * Get list of tasks belonging to a specific state
+ *
+ * @param state
+ * @return Collection<TaskAttemptInfo>
+ */
+ public final List<TaskAttemptInfo> getTaskAttempts(final TaskAttemptState state) {
+ return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+ (attemptInfoMap.values()), new Predicate<TaskAttemptInfo>() {
+ @Override
+ public boolean apply(TaskAttemptInfo input) {
+ return input.getStatus() != null && input.getStatus().equals(state.toString());
+ }
+ }
+ )
+ )
+ );
+ }
+
+ /**
+ * Get the set of containers on which the task attempts ran for this task
+ *
+ * @return Multimap<Container, TaskAttemptInfo> task attempt details at container level
+ */
+ public final Multimap<Container, TaskAttemptInfo> getContainersMapping() {
+ Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+ for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+ containerMapping.put(attemptInfo.getContainer(), attemptInfo);
+ }
+ return Multimaps.unmodifiableMultimap(containerMapping);
+ }
+
+ /**
+ * Get the successful task attempt
+ *
+ * @return TaskAttemptInfo
+ */
+ public final TaskAttemptInfo getSuccessfulTaskAttempt() {
+ if (!Strings.isNullOrEmpty(getSuccessfulAttemptId())) {
+ for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+ if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) {
+ return attemptInfo;
+ }
+ }
+ }
+ // fall back to checking status if successful attempt id is not available
+ for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+ if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) {
+ return attemptInfo;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get last task attempt to finish
+ *
+ * @return TaskAttemptInfo
+ */
+ public final TaskAttemptInfo getLastTaskAttemptToFinish() {
+ List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+ if (attemptsList.isEmpty()) {
+ return null;
+ }
+
+ return Ordering.from(new Comparator<TaskAttemptInfo>() {
+ @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+ return (o1.getFinishTimeInterval() < o2.getFinishTimeInterval()) ? -1 :
+ ((o1.getFinishTimeInterval() == o2.getFinishTimeInterval()) ?
+ 0 : 1);
+ }
+ }).max(attemptsList);
+ }
+
+ /**
+ * Get average task attempt duration. Includes succesful and failed tasks
+ *
+ * @return float
+ */
+ public final float getAvgTaskAttemptDuration() {
+ float totalTaskDuration = 0;
+ List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+ if (attemptsList.size() == 0) {
+ return 0;
+ }
+ for (TaskAttemptInfo attemptInfo : attemptsList) {
+ totalTaskDuration += attemptInfo.getTimeTaken();
+ }
+ return ((totalTaskDuration * 1.0f) / attemptsList.size());
+ }
+
+ private Ordering<TaskAttemptInfo> orderingOnTimeTaken() {
+ return Ordering.from(new Comparator<TaskAttemptInfo>() {
+ @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+ return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+ ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+ 0 : 1);
+ }
+ });
+ }
+
+ private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() {
+ return Ordering.from(new Comparator<TaskAttemptInfo>() {
+ @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+ return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+ ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1);
+ }
+ });
+ }
+
+ /**
+ * Get min task attempt duration. This includes successful/failed task attempts as well
+ *
+ * @return long
+ */
+ public final long getMinTaskAttemptDuration() {
+ List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+ if (attemptsList.isEmpty()) {
+ return 0;
+ }
+
+ return orderingOnTimeTaken().min(attemptsList).getTimeTaken();
+ }
+
+ /**
+ * Get max task attempt duration. This includes successful/failed task attempts as well
+ *
+ * @return long
+ */
+ public final long getMaxTaskAttemptDuration() {
+ List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+ if (attemptsList.isEmpty()) {
+ return 0;
+ }
+
+ return orderingOnTimeTaken().max(attemptsList).getTimeTaken();
+ }
+
+ public final int getNumberOfTaskAttempts() {
+ return getTaskAttempts().size();
+ }
+
+ public final String getStatus() {
+ return status;
+ }
+
+ public final String getTaskId() {
+ return taskId;
+ }
+
+ public final long getTimeTaken() {
+ return getFinishTimeInterval() - getStartTimeInterval();
+ }
+
+ public final String getSuccessfulAttemptId() {
+ return successfulAttemptId;
+ }
+
+ public final long getAbsoluteScheduleTime() {
+ return scheduledTime;
+ }
+
+ public final long getScheduledTime() {
+ return scheduledTime - this.getVertexInfo().getDagInfo().getStartTime();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("taskId=").append(getTaskId()).append(", ");
+ sb.append("scheduledTime=").append(getAbsoluteScheduleTime()).append(", ");
+ sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+ sb.append("finishTime=").append(getFinishTimeInterval()).append(", ");
+ sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+ sb.append("events=").append(getEvents()).append(", ");
+ sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+ sb.append("successfulAttempId=").append(getSuccessfulAttemptId()).append(", ");
+ sb.append("status=").append(getStatus()).append(", ");
+ sb.append("vertexName=").append(getVertexInfo().getVertexName());
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java
new file mode 100644
index 0000000..97d18cd
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+public class VersionInfo {
+
+ private final String buildTime;
+ private final String revision;
+ private final String version;
+
+ public VersionInfo(String buildTime, String revision, String version) {
+ this.buildTime = buildTime;
+ this.revision = revision;
+ this.version = version;
+ }
+
+ public String getBuildTime() {
+ return buildTime;
+ }
+
+ public String getRevision() {
+ return revision;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
new file mode 100644
index 0000000..50647fe
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+
+import org.apache.hadoop.util.StringInterner;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class VertexInfo extends BaseInfo {
+
+ private final String vertexId;
+ private final String vertexName;
+ private final long finishTime;
+ private final long initTime;
+ private final long initRequestedTime;
+ private final long startTime;
+ private final long startRequestedTime;
+
+ private final String diagnostics;
+ private final String processorClass;
+
+ private final int numTasks;
+ private final int failedTasks;
+ private final int completedTasks;
+ private final int succeededTasks;
+ private final int killedTasks;
+ private final int numFailedTaskAttempts;
+
+ private final String status;
+
+ //TaskID --> TaskInfo for internal reference
+ private Map<String, TaskInfo> taskInfoMap;
+
+ private final List<EdgeInfo> inEdgeList;
+ private final List<EdgeInfo> outEdgeList;
+
+ private final List<AdditionalInputOutputDetails> additionalInputInfoList;
+ private final List<AdditionalInputOutputDetails> additionalOutputInfoList;
+
+ private long avgPostDataExecutionTimeInterval = -1;
+
+ private DagInfo dagInfo;
+
+ VertexInfo(JSONObject jsonObject) throws JSONException {
+ super(jsonObject);
+
+ Preconditions.checkArgument(
+ jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+ (Constants.TEZ_VERTEX_ID));
+
+ vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY));
+ taskInfoMap = Maps.newHashMap();
+
+ inEdgeList = Lists.newLinkedList();
+ outEdgeList = Lists.newLinkedList();
+ additionalInputInfoList = Lists.newLinkedList();
+ additionalOutputInfoList = Lists.newLinkedList();
+
+ //Parse additional Info
+ JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+ initRequestedTime = otherInfoNode.optLong(Constants.INIT_REQUESTED_TIME);
+ startRequestedTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
+ startTime = otherInfoNode.optLong(Constants.START_TIME);
+ initTime = otherInfoNode.optLong(Constants.INIT_TIME);
+ finishTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+ numTasks = otherInfoNode.optInt(Constants.NUM_TASKS);
+ failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
+ succeededTasks =
+ otherInfoNode.optInt(Constants.NUM_SUCCEEDED_TASKS);
+ completedTasks =
+ otherInfoNode.optInt(Constants.NUM_COMPLETED_TASKS);
+ killedTasks = otherInfoNode.optInt(Constants.NUM_KILLED_TASKS);
+ numFailedTaskAttempts =
+ otherInfoNode.optInt(Constants.NUM_FAILED_TASKS_ATTEMPTS);
+ vertexName = StringInterner.weakIntern(otherInfoNode.optString(Constants.VERTEX_NAME));
+ processorClass = StringInterner.weakIntern(otherInfoNode.optString(Constants.PROCESSOR_CLASS_NAME));
+ status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));
+ }
+
+ public static VertexInfo create(JSONObject vertexInfoObject) throws
+ JSONException {
+ return new VertexInfo(vertexInfoObject);
+ }
+
+ /**
+ * Update edge details with source and destination vertex objects.
+ */
+ private void updateEdgeInfo() {
+ if (dagInfo.getNumVertices() == dagInfo.getVertices().size()) {
+ //We can update EdgeInfo when all vertices are parsed
+ Map<String, VertexInfo> vertexMapping = dagInfo.getVertexMapping();
+ for (EdgeInfo edge : dagInfo.getEdges()) {
+ VertexInfo sourceVertex = vertexMapping.get(edge.getInputVertexName());
+ VertexInfo destinationVertex = vertexMapping.get(edge.getOutputVertexName());
+ edge.setSourceVertex(sourceVertex);
+ edge.setDestinationVertex(destinationVertex);
+ }
+ }
+ }
+
+ void addTaskInfo(TaskInfo taskInfo) {
+ this.taskInfoMap.put(taskInfo.getTaskId(), taskInfo);
+ }
+
+ void setAdditionalInputInfoList(List<AdditionalInputOutputDetails> additionalInputInfoList) {
+ this.additionalInputInfoList.clear();
+ this.additionalInputInfoList.addAll(additionalInputInfoList);
+ }
+
+ void setAdditionalOutputInfoList(List<AdditionalInputOutputDetails> additionalOutputInfoList) {
+ this.additionalOutputInfoList.clear();
+ this.additionalOutputInfoList.addAll(additionalOutputInfoList);
+ }
+
+ void addInEdge(EdgeInfo edgeInfo) {
+ this.inEdgeList.add(edgeInfo);
+ }
+
+ void addOutEdge(EdgeInfo edgeInfo) {
+ this.outEdgeList.add(edgeInfo);
+ }
+
+ void setDagInfo(DagInfo dagInfo) {
+ Preconditions.checkArgument(dagInfo != null, "Provide valid dagInfo");
+ this.dagInfo = dagInfo;
+ //link vertex to dagInfo
+ dagInfo.addVertexInfo(this);
+ updateEdgeInfo();
+ }
+
+ public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() {
+ return Collections.unmodifiableList(additionalInputInfoList);
+ }
+
+ public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() {
+ return Collections.unmodifiableList(additionalOutputInfoList);
+ }
+
+ @Override
+ public final long getStartTimeInterval() {
+ return startTime - (dagInfo.getStartTime());
+ }
+
+ public final long getFirstTaskStartTimeInterval() {
+ TaskInfo firstTask = getFirstTaskToStart();
+ if (firstTask == null) {
+ return 0;
+ }
+ return firstTask.getStartTimeInterval();
+ }
+
+ public final long getLastTaskFinishTimeInterval() {
+ if (getLastTaskToFinish() == null || getLastTaskToFinish().getFinishTimeInterval() < 0) {
+ return dagInfo.getFinishTimeInterval();
+ }
+ return getLastTaskToFinish().getFinishTimeInterval();
+ }
+
+ public final long getAvgPostDataExecutionTimeInterval() {
+ if (avgPostDataExecutionTimeInterval == -1) {
+ long totalExecutionTime = 0;
+ long totalAttempts = 0;
+ for (TaskInfo task : getTasks()) {
+ TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt();
+ if (attempt != null) {
+ // count only time after last data was received
+ long execTime = attempt.getPostDataExecutionTimeInterval();
+ if (execTime >= 0) {
+ totalExecutionTime += execTime;
+ totalAttempts++;
+ }
+ }
+ }
+ if (totalAttempts > 0) {
+ avgPostDataExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts);
+ }
+ }
+ return avgPostDataExecutionTimeInterval;
+ }
+
+ public final long getStartTime() {
+ return startTime;
+ }
+
+ public final long getFinishTime() {
+ return finishTime;
+ }
+
+ public final long getInitTime() {
+ return initTime;
+ }
+
+ public final long getInitRequestedTime() {
+ return initRequestedTime;
+ }
+
+ public final long getStartRequestedTime() {
+ return startRequestedTime;
+ }
+
+ @Override
+ public final long getFinishTimeInterval() {
+ long vertexEndTime = finishTime - (dagInfo.getStartTime());
+ if (vertexEndTime < 0) {
+ //probably vertex is not complete or failed in middle. get the last task attempt time
+ for (TaskInfo taskInfo : getTasks()) {
+ vertexEndTime = (taskInfo.getFinishTimeInterval() > vertexEndTime)
+ ? taskInfo.getFinishTimeInterval() : vertexEndTime;
+ }
+ }
+ return vertexEndTime;
+ }
+
+ @Override
+ public final String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public final String getVertexName() {
+ return vertexName;
+ }
+
+ public final String getVertexId() {
+ return vertexId;
+ }
+
+ //Quite possible that getFinishTime is not yet recorded for failed vertices (or killed vertices)
+ //Start time of vertex infers that the dependencies are done and AM has inited it.
+ public final long getTimeTaken() {
+ return (getFinishTimeInterval() - getStartTimeInterval());
+ }
+
+ //Time taken for last task to finish - time taken for first task to start
+ public final long getTimeTakenForTasks() {
+ return (getLastTaskFinishTimeInterval() - getFirstTaskStartTimeInterval());
+ }
+
+ public final long getInitTimeInterval() {
+ return initTime - dagInfo.getStartTime();
+ }
+
+ public final int getNumTasks() {
+ return numTasks;
+ }
+
+ public final int getFailedTasksCount() {
+ return failedTasks;
+ }
+
+ public final int getKilledTasksCount() {
+ return killedTasks;
+ }
+
+ public final int getCompletedTasksCount() {
+ return completedTasks;
+ }
+
+ public final int getSucceededTasksCount() {
+ return succeededTasks;
+ }
+
+ public final int getNumFailedTaskAttemptsCount() {
+ return numFailedTaskAttempts;
+ }
+
+ public final String getProcessorClassName() {
+ return processorClass;
+
+ }
+
+
+ private List<TaskInfo> getTasksInternal() {
+ return Lists.newLinkedList(taskInfoMap.values());
+ }
+
+ /**
+ * Get all tasks
+ *
+ * @return list of taskInfo
+ */
+ public final List<TaskInfo> getTasks() {
+ return Collections.unmodifiableList(getTasksInternal());
+ }
+
+ /**
+ * Get all tasks in sorted order
+ *
+ * @param sorted
+ * @param ordering
+ * @return list of TaskInfo
+ */
+ public final List<TaskInfo> getTasks(boolean sorted, @Nullable Ordering<TaskInfo> ordering) {
+ List<TaskInfo> taskInfoList = getTasksInternal();
+ if (sorted) {
+ Collections.sort(taskInfoList, ((ordering == null) ? orderingOnStartTime() : ordering));
+ }
+ return Collections.unmodifiableList(taskInfoList);
+ }
+
+ /**
+ * Get list of failed tasks
+ *
+ * @return List<TaskAttemptInfo>
+ */
+ public final List<TaskInfo> getFailedTasks() {
+ return getTasks(TaskState.FAILED);
+ }
+
+ /**
+ * Get list of killed tasks
+ *
+ * @return List<TaskAttemptInfo>
+ */
+ public final List<TaskInfo> getKilledTasks() {
+ return getTasks(TaskState.KILLED);
+ }
+
+ /**
+ * Get list of failed tasks
+ *
+ * @return List<TaskAttemptInfo>
+ */
+ public final List<TaskInfo> getSuccessfulTasks() {
+ return getTasks(TaskState.SUCCEEDED);
+ }
+
+ /**
+ * Get list of tasks belonging to a specific state
+ *
+ * @param state
+ * @return List<TaskAttemptInfo>
+ */
+ public final List<TaskInfo> getTasks(final TaskState state) {
+ return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+ (taskInfoMap.values()), new Predicate<TaskInfo>() {
+ @Override public boolean apply(TaskInfo input) {
+ return input.getStatus() != null && input.getStatus().equals(state.toString());
+ }
+ }
+ )
+ )
+ );
+ }
+
+ /**
+ * Get source vertices for this vertex
+ *
+ * @return List<VertexInfo> list of incoming vertices to this vertex
+ */
+ public final List<VertexInfo> getInputVertices() {
+ List<VertexInfo> inputVertices = Lists.newLinkedList();
+ for (EdgeInfo edge : inEdgeList) {
+ inputVertices.add(edge.getSourceVertex());
+ }
+ return Collections.unmodifiableList(inputVertices);
+ }
+
+ /**
+ * Get destination vertices for this vertex
+ *
+ * @return List<VertexInfo> list of output vertices
+ */
+ public final List<VertexInfo> getOutputVertices() {
+ List<VertexInfo> outputVertices = Lists.newLinkedList();
+ for (EdgeInfo edge : outEdgeList) {
+ outputVertices.add(edge.getDestinationVertex());
+ }
+ return Collections.unmodifiableList(outputVertices);
+ }
+
+ // expensive method to call for large DAGs as it creates big lists on every call
+ private List<TaskAttemptInfo> getTaskAttemptsInternal() {
+ List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
+ for (TaskInfo taskInfo : getTasks()) {
+ taskAttemptInfos.addAll(taskInfo.getTaskAttempts());
+ }
+ return taskAttemptInfos;
+ }
+
+ /**
+ * Get all task attempts
+ *
+ * @return List<TaskAttemptInfo> list of attempts
+ */
+ public List<TaskAttemptInfo> getTaskAttempts() {
+ return Collections.unmodifiableList(getTaskAttemptsInternal());
+ }
+
+ /**
+ * Get all task attempts in sorted order
+ *
+ * @param sorted
+ * @param ordering
+ * @return list of TaskAttemptInfo
+ */
+ public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted,
+ @Nullable Ordering<TaskAttemptInfo> ordering) {
+ List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal();
+ if (sorted) {
+ Collections.sort(taskAttemptInfos, ((ordering == null) ? orderingOnAttemptStartTime() : ordering));
+ }
+ return Collections.unmodifiableList(taskAttemptInfos);
+ }
+
+ public final TaskInfo getTask(String taskId) {
+ return taskInfoMap.get(taskId);
+ }
+
+ /**
+ * Get incoming edge information for a specific vertex
+ *
+ * @return List<EdgeInfo> list of input edges on this vertex
+ */
+ public final List<EdgeInfo> getInputEdges() {
+ return Collections.unmodifiableList(inEdgeList);
+ }
+
+ /**
+ * Get outgoing edge information for a specific vertex
+ *
+ * @return List<EdgeInfo> list of output edges on this vertex
+ */
+ public final List<EdgeInfo> getOutputEdges() {
+ return Collections.unmodifiableList(outEdgeList);
+ }
+
+ public final Multimap<Container, TaskAttemptInfo> getContainersMapping() {
+ Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+ for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+ containerMapping.put(attemptInfo.getContainer(), attemptInfo);
+ }
+ return Multimaps.unmodifiableMultimap(containerMapping);
+ }
+
+ /**
+ * Get first task to start
+ *
+ * @return TaskInfo
+ */
+ public final TaskInfo getFirstTaskToStart() {
+ List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
+ if (taskInfoList.size() == 0) {
+ return null;
+ }
+ Collections.sort(taskInfoList, new Comparator<TaskInfo>() {
+ @Override public int compare(TaskInfo o1, TaskInfo o2) {
+ return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+ ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ?
+ 0 : 1);
+ }
+ });
+ return taskInfoList.get(0);
+ }
+
+ /**
+ * Get last task to finish
+ *
+ * @return TaskInfo
+ */
+ public final TaskInfo getLastTaskToFinish() {
+ List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
+ if (taskInfoList.size() == 0) {
+ return null;
+ }
+ Collections.sort(taskInfoList, new Comparator<TaskInfo>() {
+ @Override public int compare(TaskInfo o1, TaskInfo o2) {
+ return (o1.getFinishTimeInterval() > o2.getFinishTimeInterval()) ? -1 :
+ ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ?
+ 0 : 1);
+ }
+ });
+ return taskInfoList.get(0);
+ }
+
+ /**
+ * Get average task duration
+ *
+ * @return long
+ */
+ public final float getAvgTaskDuration() {
+ float totalTaskDuration = 0;
+ List<TaskInfo> tasksList = getTasks();
+ if (tasksList.size() == 0) {
+ return 0;
+ }
+ for (TaskInfo taskInfo : tasksList) {
+ totalTaskDuration += taskInfo.getTimeTaken();
+ }
+ return ((totalTaskDuration * 1.0f) / tasksList.size());
+ }
+
+ /**
+ * Get min task duration in vertex
+ *
+ * @return long
+ */
+ public final long getMinTaskDuration() {
+ TaskInfo taskInfo = getMinTaskDurationTask();
+ return (taskInfo != null) ? taskInfo.getTimeTaken() : 0;
+ }
+
+ /**
+ * Get max task duration in vertex
+ *
+ * @return long
+ */
+ public final long getMaxTaskDuration() {
+ TaskInfo taskInfo = getMaxTaskDurationTask();
+ return (taskInfo != null) ? taskInfo.getTimeTaken() : 0;
+ }
+
+ private Ordering<TaskInfo> orderingOnTimeTaken() {
+ return Ordering.from(new Comparator<TaskInfo>() {
+ @Override public int compare(TaskInfo o1, TaskInfo o2) {
+ return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+ ((o1.getTimeTaken() == o2.getTimeTaken()) ? 0 : 1);
+ }
+ });
+ }
+
+ private Ordering<TaskInfo> orderingOnStartTime() {
+ return Ordering.from(new Comparator<TaskInfo>() {
+ @Override public int compare(TaskInfo o1, TaskInfo o2) {
+ return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+ ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1);
+ }
+ });
+ }
+
+ private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() {
+ return Ordering.from(new Comparator<TaskAttemptInfo>() {
+ @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+ return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
+ ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1);
+ }
+ });
+ }
+
+ /**
+ * Get min task duration in vertex
+ *
+ * @return TaskInfo
+ */
+ public final TaskInfo getMinTaskDurationTask() {
+ List<TaskInfo> taskInfoList = getTasks();
+ if (taskInfoList.size() == 0) {
+ return null;
+ }
+
+ return orderingOnTimeTaken().min(taskInfoList);
+ }
+
+ /**
+ * Get max task duration in vertex
+ *
+ * @return TaskInfo
+ */
+ public final TaskInfo getMaxTaskDurationTask() {
+ List<TaskInfo> taskInfoList = getTasks();
+ if (taskInfoList.size() == 0) {
+ return null;
+ }
+ return orderingOnTimeTaken().max(taskInfoList);
+ }
+
+ public final String getStatus() {
+ return status;
+ }
+
+ public final DagInfo getDagInfo() {
+ return dagInfo;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("vertexName=").append(getVertexName()).append(", ");
+ sb.append("events=").append(getEvents()).append(", ");
+ sb.append("initTime=").append(getInitTimeInterval()).append(", ");
+ sb.append("startTime=").append(getStartTimeInterval()).append(", ");
+ sb.append("endTime=").append(getFinishTimeInterval()).append(", ");
+ sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+ sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+ sb.append("numTasks=").append(getNumTasks()).append(", ");
+ sb.append("processorClassName=").append(getProcessorClassName()).append(", ");
+ sb.append("numCompletedTasks=").append(getCompletedTasksCount()).append(", ");
+ sb.append("numFailedTaskAttempts=").append(getNumFailedTaskAttemptsCount()).append(", ");
+ sb.append("numSucceededTasks=").append(getSucceededTasksCount()).append(", ");
+ sb.append("numFailedTasks=").append(getFailedTasks()).append(", ");
+ sb.append("numKilledTasks=").append(getKilledTasks()).append(", ");
+ sb.append("tasksCount=").append(taskInfoMap.size()).append(", ");
+ sb.append("status=").append(getStatus());
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
new file mode 100644
index 0000000..ffb854a
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.utils;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.List;
+
+@InterfaceAudience.Private
+public class Utils {
+
+ private static final String LOG4J_CONFIGURATION = "log4j.configuration";
+
+ /**
+ * Parse tez counters from json
+ *
+ * @param jsonObject
+ * @return TezCounters
+ * @throws JSONException
+ */
+ public static TezCounters parseTezCountersFromJSON(JSONObject jsonObject)
+ throws JSONException {
+ TezCounters counters = new TezCounters();
+
+ if (jsonObject == null) {
+ return counters; //empty counters.
+ }
+
+ final JSONArray counterGroupNodes = jsonObject.optJSONArray(Constants.COUNTER_GROUPS);
+ if (counterGroupNodes != null) {
+ for (int i = 0; i < counterGroupNodes.length(); i++) {
+ JSONObject counterGroupNode = counterGroupNodes.optJSONObject(i);
+ final String groupName = counterGroupNode.optString(Constants.COUNTER_GROUP_NAME);
+ final String groupDisplayName = counterGroupNode.optString(
+ Constants.COUNTER_GROUP_DISPLAY_NAME, groupName);
+
+ CounterGroup group = counters.addGroup(groupName, groupDisplayName);
+
+ final JSONArray counterNodes = counterGroupNode.optJSONArray(Constants.COUNTERS);
+
+ //Parse counter nodes
+ for (int j = 0; j < counterNodes.length(); j++) {
+ JSONObject counterNode = counterNodes.optJSONObject(j);
+ final String counterName = counterNode.getString(Constants.COUNTER_NAME);
+ final String counterDisplayName =
+ counterNode.optString(Constants.COUNTER_DISPLAY_NAME, counterName);
+ final long counterValue = counterNode.getLong(Constants.COUNTER_VALUE);
+ TezCounter counter = group.findCounter(
+ counterName,
+ counterDisplayName);
+ counter.setValue(counterValue);
+ }
+ }
+ }
+ return counters;
+ }
+
+ public static List<DataDependencyEvent> parseDataEventDependencyFromJSON(JSONObject jsonObject)
+ throws JSONException {
+ List<DataDependencyEvent> events = Lists.newArrayList();
+ JSONArray fields = jsonObject.optJSONArray(Constants.LAST_DATA_EVENTS);
+ for (int i=0; i<fields.length(); i++) {
+ JSONObject eventMap = fields.getJSONObject(i);
+ events.add(new DataDependencyEvent(
+ StringInterner.weakIntern(eventMap.optString(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())),
+ eventMap.optLong(Constants.TIMESTAMP)));
+ }
+ return events;
+ }
+
+ /**
+ * Parse events from json
+ *
+ * @param eventNodes
+ * @param eventList
+ * @throws JSONException
+ */
+ public static void parseEvents(JSONArray eventNodes, List<Event> eventList) throws
+ JSONException {
+ if (eventNodes == null) {
+ return;
+ }
+ for (int i = 0; i < eventNodes.length(); i++) {
+ JSONObject eventNode = eventNodes.optJSONObject(i);
+ final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
+ final String eventType = eventNode.optString(Constants.EVENT_TYPE);
+ final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
+
+ Event event = new Event(eventInfo, eventType, time);
+
+ eventList.add(event);
+
+ }
+ }
+
+ public static void setupRootLogger() {
+ if (Strings.isNullOrEmpty(System.getProperty(LOG4J_CONFIGURATION))) {
+ //By default print to console with INFO level
+ Logger.getRootLogger().
+ addAppender(new ConsoleAppender(new PatternLayout(PatternLayout.TTCC_CONVERSION_PATTERN)));
+ Logger.getRootLogger().setLevel(Level.INFO);
+ }
+ }
+
+}