You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/19 02:35:22 UTC
[2/2] tez git commit: TEZ-2076. Tez framework to extract/analyze data
stored in ATS for specific dag (rbalamohan)
TEZ-2076. Tez framework to extract/analyze data stored in ATS for specific dag (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/11aa17e0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/11aa17e0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/11aa17e0
Branch: refs/heads/master
Commit: 11aa17e01f7d72bdea26db2188f8b5cb0c3d84ae
Parents: 12fc2c7
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue May 19 06:05:18 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue May 19 06:05:18 2015 +0530
----------------------------------------------------------------------
tez-plugins/pom.xml | 4 +
.../tez-history-parser/findbugs-exclude.xml | 16 +
tez-plugins/tez-history-parser/pom.xml | 190 +++++++
.../org/apache/tez/history/ATSImportTool.java | 474 ++++++++++++++++
.../org/apache/tez/history/parser/ATSData.java | 48 ++
.../tez/history/parser/ATSFileParser.java | 227 ++++++++
.../datamodel/AdditionalInputOutputDetails.java | 71 +++
.../tez/history/parser/datamodel/BaseInfo.java | 142 +++++
.../history/parser/datamodel/BaseParser.java | 114 ++++
.../tez/history/parser/datamodel/Constants.java | 59 ++
.../tez/history/parser/datamodel/Container.java | 70 +++
.../tez/history/parser/datamodel/DagInfo.java | 555 +++++++++++++++++++
.../tez/history/parser/datamodel/EdgeInfo.java | 112 ++++
.../tez/history/parser/datamodel/Event.java | 63 +++
.../parser/datamodel/TaskAttemptInfo.java | 252 +++++++++
.../tez/history/parser/datamodel/TaskInfo.java | 341 ++++++++++++
.../history/parser/datamodel/VersionInfo.java | 45 ++
.../history/parser/datamodel/VertexInfo.java | 536 ++++++++++++++++++
.../apache/tez/history/parser/utils/Utils.java | 121 ++++
.../apache/tez/history/TestATSFileParser.java | 545 ++++++++++++++++++
20 files changed, 3985 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
index 9b2a4cb..535007a 100644
--- a/tez-plugins/pom.xml
+++ b/tez-plugins/pom.xml
@@ -26,6 +26,10 @@
<artifactId>tez-plugins</artifactId>
<packaging>pom</packaging>
+ <modules>
+ <module>tez-history-parser</module>
+ </modules>
+
<profiles>
<profile>
<id>hadoop24</id>
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/findbugs-exclude.xml b/tez-plugins/tez-history-parser/findbugs-exclude.xml
new file mode 100644
index 0000000..5b11308
--- /dev/null
+++ b/tez-plugins/tez-history-parser/findbugs-exclude.xml
@@ -0,0 +1,16 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/pom.xml b/tez-plugins/tez-history-parser/pom.xml
new file mode 100644
index 0000000..f12e0b4
--- /dev/null
+++ b/tez-plugins/tez-history-parser/pom.xml
@@ -0,0 +1,190 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-plugins</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-history-parser</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.tez.history.ATSImportTool</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <addClasspath>true</addClasspath>
+ <mainClass>org.apache.tez.history.ATSImportTool</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>assemble-all</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
new file mode 100644
index 0000000..3545cec
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.directory.api.util.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * <pre>
+ * Simple tool which imports ATS data pertaining to a DAG (Dag, Vertex, Task, Attempt)
+ * and creates a zip file out of it.
+ *
+ * usage:
+ *
+ * java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool
+ *
+ * OR
+ *
+ * HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar tez-history-parser-x.y.z.jar org.apache.tez.history.ATSImportTool
+ *
+ *
+ * --yarnTimelineAddress <yarnTimelineAddress> Optional. Yarn Timeline Address(e.g http://clusterATSNode:8188)
+ * --batchSize <batchSize> Optional. batch size for downloading data
+ * --dagId <dagId> DagId that needs to be downloaded
+ * --downloadDir <downloadDir> download directory where data needs to be downloaded
+ * --help print help
+ *
+ * </pre>
+ */
+@Evolving
+public class ATSImportTool extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ATSImportTool.class);
+
+ private static final String BATCH_SIZE = "batchSize";
+ private static final int BATCH_SIZE_DEFAULT = 100;
+
+ private static final String YARN_TIMELINE_SERVICE_ADDRESS = "yarnTimelineAddress";
+ private static final String DAG_ID = "dagId";
+ private static final String BASE_DOWNLOAD_DIR = "downloadDir";
+
+ private static final String HTTPS_SCHEME = "https://";
+ private static final String HTTP_SCHEME = "http://";
+
+ private static final String VERTEX_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+ private static final String TASK_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+ private static final String TASK_ATTEMPT_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+ private static final String UTF8 = "UTF-8";
+
+ private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+ private final int batchSize;
+ private final String baseUri;
+ private final String dagId;
+
+ private final File downloadDir;
+ private final File zipFile;
+ private final Client httpClient;
+
+ public ATSImportTool(String baseUri, String dagId, File baseDownloadDir, int batchSize)
+ throws TezException {
+ Preconditions.checkArgument(!Strings.isEmpty(dagId), "dagId can not be null or empty");
+ Preconditions.checkArgument(baseDownloadDir != null, "downloadDir can not be null");
+ Preconditions.checkArgument(TezDAGID.fromString(dagId) != null, "Not a valid DAG ID " + dagId);
+
+ this.baseUri = baseUri;
+ this.batchSize = batchSize;
+ this.dagId = dagId;
+
+ this.httpClient = getHttpClient();
+
+ this.downloadDir = new File(baseDownloadDir, dagId);
+ this.zipFile = new File(downloadDir, this.dagId + ".zip");
+
+ boolean result = downloadDir.mkdirs();
+ LOG.trace("Result of creating dir {}={}", downloadDir, result);
+ if (!downloadDir.exists()) {
+ throw new IllegalArgumentException("dir=" + downloadDir + " does not exist");
+ }
+
+ LOG.info("Using baseURL={}, dagId={}, batchSize={}, downloadDir={}", baseUri, dagId,
+ batchSize, downloadDir);
+ }
+
+ /**
+ * Download data from ATS for specific DAG
+ *
+ * @throws Exception
+ */
+ private void download() throws Exception {
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(zipFile, false);
+ ZipOutputStream zos = new ZipOutputStream(fos);
+ downloadData(zos);
+ IOUtils.closeQuietly(zos);
+ } catch (Exception e) {
+ LOG.error("Exception in download", e);
+ throw e;
+ } finally {
+ if (httpClient != null) {
+ httpClient.destroy();
+ }
+ IOUtils.closeQuietly(fos);
+ }
+ }
+
+ /**
+ * Download DAG data (DAG, Vertex, Task, TaskAttempts) from ATS and write to zip file
+ *
+ * @param zos
+ * @throws TezException
+ * @throws JSONException
+ * @throws IOException
+ */
+ private void downloadData(ZipOutputStream zos) throws TezException, JSONException, IOException {
+ JSONObject finalJson = new JSONObject();
+
+ //Download application details (TEZ_VERSION etc)
+ String tezAppId = "tez_" + (TezDAGID.fromString(dagId)).getApplicationId().toString();
+ String tezAppUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_APPLICATION, tezAppId);
+ JSONObject tezAppJson = getJsonRootEntity(tezAppUrl);
+ finalJson.put(Constants.APPLICATION, tezAppJson);
+
+ //Download dag
+ String dagUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_DAG_ID, dagId);
+ JSONObject dagRoot = getJsonRootEntity(dagUrl);
+ finalJson.put(Constants.DAG, dagRoot);
+
+ //Create a zip entry with dagId as its name.
+ ZipEntry zipEntry = new ZipEntry(dagId);
+ zos.putNextEntry(zipEntry);
+ //Write in formatted way
+ IOUtils.write(finalJson.toString(4), zos, UTF8);
+
+ //Download vertex
+ String vertexURL =
+ String.format(VERTEX_QUERY_STRING, baseUri,
+ Constants.TEZ_VERTEX_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+ downloadJSONArrayFromATS(vertexURL, zos, Constants.VERTICES);
+
+ //Download task
+ String taskURL = String.format(TASK_QUERY_STRING, baseUri,
+ Constants.TEZ_TASK_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+ downloadJSONArrayFromATS(taskURL, zos, Constants.TASKS);
+
+ //Download task attempts
+ String taskAttemptURL = String.format(TASK_ATTEMPT_QUERY_STRING, baseUri,
+ Constants.TEZ_TASK_ATTEMPT_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+ downloadJSONArrayFromATS(taskAttemptURL, zos, Constants.TASK_ATTEMPTS);
+ }
+
+ /**
+ * Download data from ATS in batches
+ *
+ * @param url
+ * @param zos
+ * @param tag
+ * @throws IOException
+ * @throws TezException
+ * @throws JSONException
+ */
+ private void downloadJSONArrayFromATS(String url, ZipOutputStream zos, String tag)
+ throws IOException, TezException, JSONException {
+
+ Preconditions.checkArgument(zos != null, "ZipOutputStream can not be null");
+
+ String baseUrl = url;
+ JSONArray entities;
+
+ long downloadedCount = 0;
+ while ((entities = getJsonRootEntity(url).optJSONArray(Constants.ENTITIES)) != null
+ && entities.length() > 0) {
+
+ int limit = (entities.length() >= batchSize) ? (entities.length() - 1) : entities.length();
+ LOG.debug("Limit={}, downloaded entities len={}", limit, entities.length());
+
+ //write downloaded part to zipfile. This is done to avoid any memory pressure when
+ // downloading and writing 1000s of tasks.
+ ZipEntry zipEntry = new ZipEntry("part-" + System.currentTimeMillis() + ".json");
+ zos.putNextEntry(zipEntry);
+ JSONObject finalJson = new JSONObject();
+ finalJson.put(tag, entities);
+ IOUtils.write(finalJson.toString(4), zos, "UTF-8");
+ downloadedCount += entities.length();
+
+ if (entities.length() < batchSize) {
+ break;
+ }
+
+ //Set the last item in entities as the fromId
+ url = baseUrl + "&fromId="
+ + entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY);
+
+ String firstItem = entities.getJSONObject(0).getString(Constants.ENTITY);
+ String lastItem = entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY);
+ LOG.info("Downloaded={}, First item={}, LastItem={}, new url={}", downloadedCount,
+ firstItem, lastItem, url);
+ }
+ }
+
+ private String logErrorMessage(ClientResponse response) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ LOG.error("Response status={}", response.getClientResponseStatus().toString());
+ LineIterator it = null;
+ try {
+ it = IOUtils.lineIterator(response.getEntityInputStream(), UTF8);
+ while (it.hasNext()) {
+ String line = it.nextLine();
+ LOG.error(line);
+ }
+ } finally {
+ if (it != null) {
+ it.close();
+ }
+ }
+ return sb.toString();
+ }
+
+ //For secure cluster, this should work as long as valid ticket is available in the node.
+ private JSONObject getJsonRootEntity(String url) throws TezException, IOException {
+ try {
+ WebResource wr = getHttpClient().resource(url);
+ ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE)
+ .type(MediaType.APPLICATION_JSON_TYPE)
+ .get(ClientResponse.class);
+
+ if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
+ // In the case of secure cluster, if there is any auth exception it sends the data back as
+ // a html page and JSON parsing could throw exceptions. Instead, get the stream contents
+ // completely and log it in case of error.
+ logErrorMessage(response);
+ throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+ }
+ return response.getEntity(JSONObject.class);
+ } catch (ClientHandlerException e) {
+ throw new TezException("Error processing response from YARN Timeline. URL=" + url, e);
+ } catch (UniformInterfaceException e) {
+ throw new TezException("Error accessing content from YARN Timeline - unexpected response. "
+ + "URL=" + url, e);
+ } catch (IllegalArgumentException e) {
+ throw new TezException("Error accessing content from YARN Timeline - invalid url. URL=" + url,
+ e);
+ }
+ }
+
+ private Client getHttpClient() {
+ if (httpClient == null) {
+ ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+ HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
+ return new Client(new URLConnectionClientHandler(urlFactory), config);
+ }
+ return httpClient;
+ }
+
+ static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+ URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+ return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ try {
+ download();
+ return 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.error("Error occurred when downloading data ", e);
+ return -1;
+ }
+ }
+
+ private static Options buildOptions() {
+ Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
+ .withDescription("DagId that needs to be downloaded").hasArg().isRequired(true).create();
+
+ Option downloadDirOption = OptionBuilder.withArgName(BASE_DOWNLOAD_DIR).withLongOpt
+ (BASE_DOWNLOAD_DIR)
+ .withDescription("Download directory where data needs to be downloaded").hasArg()
+ .isRequired(true).create();
+
+ Option atsAddressOption = OptionBuilder.withArgName(YARN_TIMELINE_SERVICE_ADDRESS).withLongOpt(
+ YARN_TIMELINE_SERVICE_ADDRESS)
+ .withDescription("Optional. ATS address (e.g http://clusterATSNode:8188)").hasArg()
+ .isRequired(false)
+ .create();
+
+ Option batchSizeOption = OptionBuilder.withArgName(BATCH_SIZE).withLongOpt(BATCH_SIZE)
+ .withDescription("Optional. batch size for downloading data").hasArg()
+ .isRequired(false)
+ .create();
+
+ Option help = OptionBuilder.withArgName("help").withLongOpt("help")
+ .withDescription("print help").isRequired(false).create();
+
+ Options opts = new Options();
+ opts.addOption(dagIdOption);
+ opts.addOption(downloadDirOption);
+ opts.addOption(atsAddressOption);
+ opts.addOption(batchSizeOption);
+ opts.addOption(help);
+ return opts;
+ }
+
+ static void printHelp(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setWidth(240);
+ String help = LINE_SEPARATOR
+ + "java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool"
+ + LINE_SEPARATOR
+ + "OR"
+ + LINE_SEPARATOR
+ + "HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar "
+ + "tez-history-parser-x.y.z.jar " + ATSImportTool.class.getName()
+ + LINE_SEPARATOR;
+ formatter.printHelp(240, help, "Options", options, "", true);
+ }
+
+ static boolean hasHttpsPolicy(Configuration conf) {
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ return (HttpConfig.Policy.HTTPS_ONLY == HttpConfig.Policy.fromString(yarnConf
+ .get(YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
+ }
+
+ static String getBaseTimelineURL(String yarnTimelineAddress, Configuration conf)
+ throws TezException {
+ boolean isHttps = hasHttpsPolicy(conf);
+
+ if (yarnTimelineAddress == null) {
+ if (isHttps) {
+ yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME);
+ } else {
+ yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME);
+ }
+ Preconditions.checkArgument(!Strings.isEmpty(yarnTimelineAddress), "Yarn timeline address can"
+ + " not be empty. Please check configurations.");
+ } else {
+ yarnTimelineAddress = yarnTimelineAddress.trim();
+ Preconditions.checkArgument(!Strings.isEmpty(yarnTimelineAddress), "Yarn timeline address can"
+ + " not be empty. Please provide valid url with --" +
+ YARN_TIMELINE_SERVICE_ADDRESS + " option");
+ }
+
+ yarnTimelineAddress = yarnTimelineAddress.toLowerCase();
+ if (!yarnTimelineAddress.startsWith(HTTP_SCHEME)
+ && !yarnTimelineAddress.startsWith(HTTPS_SCHEME)) {
+ yarnTimelineAddress = ((isHttps) ? HTTPS_SCHEME : HTTP_SCHEME) + yarnTimelineAddress;
+ }
+
+ try {
+ yarnTimelineAddress = new URI(yarnTimelineAddress).normalize().toString().trim();
+ yarnTimelineAddress = (yarnTimelineAddress.endsWith("/")) ?
+ yarnTimelineAddress.substring(0, yarnTimelineAddress.length() - 1) :
+ yarnTimelineAddress;
+ } catch (URISyntaxException e) {
+ throw new TezException("Please provide a valid URL. url=" + yarnTimelineAddress, e);
+ }
+
+ return Joiner.on("").join(yarnTimelineAddress, Constants.RESOURCE_URI_BASE);
+ }
+
+ @VisibleForTesting
+ static int process(String[] args) {
+ Options options = buildOptions();
+ int result = -1;
+ try {
+ Configuration conf = new Configuration();
+ CommandLine cmdLine = new GnuParser().parse(options, args);
+ String dagId = cmdLine.getOptionValue(DAG_ID);
+
+ File downloadDir = new File(cmdLine.getOptionValue(BASE_DOWNLOAD_DIR));
+
+ String yarnTimelineAddress = cmdLine.getOptionValue(YARN_TIMELINE_SERVICE_ADDRESS);
+ String baseTimelineURL = getBaseTimelineURL(yarnTimelineAddress, conf);
+
+ int batchSize = (cmdLine.hasOption(BATCH_SIZE)) ?
+ (Integer.parseInt(cmdLine.getOptionValue(BATCH_SIZE))) : BATCH_SIZE_DEFAULT;
+
+ result = ToolRunner.run(conf, new ATSImportTool(baseTimelineURL, dagId,
+ downloadDir, batchSize), args);
+
+ return result;
+ } catch (MissingOptionException missingOptionException) {
+ LOG.error("Error in parsing options ", missingOptionException);
+ printHelp(options);
+ } catch (ParseException e) {
+ LOG.error("Error in parsing options ", e);
+ printHelp(options);
+ } catch (Exception e) {
+ LOG.error("Error in processing ", e);
+ throw e;
+ } finally {
+ return result;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Utils.setupRootLogger();
+ int res = process(args);
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java
new file mode 100644
index 0000000..f504007
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser;
+
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.codehaus.jettison.json.JSONException;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Main interface to pull data from ATS.
+ * <p/>
+ * It is possible that to have ATS data store in any DB (e.g LevelDB or HBase). Depending on
+ * store, there can be multiple implementations to pull data from these stores and create the
+ * DagInfo object for analysis.
+ * <p/>
+ */
+@Evolving
+public interface ATSData {
+
+ /**
+ * Get the DAG representation for processing
+ *
+ * @param dagId
+ * @return DagInfo
+ * @throws JSONException
+ * @throws TezException
+ */
+ public DagInfo getDAGData(String dagId) throws TezException;
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
new file mode 100644
index 0000000..aae20eb
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Simple class to parse ATS zip file of a DAG and generate the relevant in-memory structure
+ * (DagInfo) necessary for processing later.
+ */
+
+@Public
+@Evolving
+public class ATSFileParser extends BaseParser implements ATSData {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ATSFileParser.class);
+
+ private final File atsZipFile;
+
+ public ATSFileParser(File atsZipFile) throws TezException {
+ super();
+ Preconditions.checkArgument(atsZipFile.exists(), "Zipfile " + atsZipFile + " does not exist");
+ this.atsZipFile = atsZipFile;
+ }
+
+ @Override
+ public DagInfo getDAGData(String dagId) throws TezException {
+ try {
+ parseATSZipFile(atsZipFile);
+
+ linkParsedContents();
+
+ return dagInfo;
+ } catch (IOException e) {
+ LOG.error("Error in reading DAG ", e);
+ throw new TezException(e);
+ } catch (JSONException e) {
+ LOG.error("Error in parsing DAG ", e);
+ throw new TezException(e);
+ } catch (InterruptedException e) {
+ throw new TezException(e);
+ }
+ }
+
+ /**
+ * Parse vertices json
+ *
+ * @param verticesJson
+ * @throws JSONException
+ */
+ private void processVertices(JSONArray verticesJson) throws JSONException {
+ //Process vertex information
+ Preconditions.checkState(verticesJson != null, "Vertex json can not be null");
+ if (verticesJson != null) {
+ LOG.info("Started parsing vertex");
+ for (int i = 0; i < verticesJson.length(); i++) {
+ VertexInfo vertexInfo = VertexInfo.create(verticesJson.getJSONObject(i));
+ vertexList.add(vertexInfo);
+ }
+ LOG.info("Finished parsing vertex");
+ }
+ }
+
+ /**
+ * Parse Tasks json
+ *
+ * @param tasksJson
+ * @throws JSONException
+ */
+ private void processTasks(JSONArray tasksJson) throws JSONException {
+ //Process Task information
+ Preconditions.checkState(tasksJson != null, "Task json can not be null");
+ if (tasksJson != null) {
+ LOG.debug("Started parsing task");
+ for (int i = 0; i < tasksJson.length(); i++) {
+ TaskInfo taskInfo = TaskInfo.create(tasksJson.getJSONObject(i));
+ taskList.add(taskInfo);
+ }
+ LOG.debug("Finished parsing task");
+ }
+ }
+
+ /**
+ * Parse TaskAttempt json
+ *
+ * @param taskAttemptsJson
+ * @throws JSONException
+ */
+ private void processAttempts(JSONArray taskAttemptsJson) throws JSONException {
+ //Process TaskAttempt information
+ Preconditions.checkState(taskAttemptsJson != null, "Attempts json can not be null");
+ if (taskAttemptsJson != null) {
+ LOG.debug("Started parsing task attempts");
+ for (int i = 0; i < taskAttemptsJson.length(); i++) {
+ TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(taskAttemptsJson.getJSONObject(i));
+ attemptList.add(attemptInfo);
+ }
+ LOG.debug("Finished parsing task attempts");
+ }
+ }
+
+ /**
+ * Parse TezApplication json
+ *
+ * @param tezApplicationJson
+ * @throws JSONException
+ */
+ private void processApplication(JSONObject tezApplicationJson) throws JSONException {
+ if (tezApplicationJson != null) {
+ LOG.debug("Started parsing tez application");
+ JSONObject otherInfoNode = tezApplicationJson.optJSONObject(Constants.OTHER_INFO);
+ if (otherInfoNode != null) {
+ JSONObject tezVersion = otherInfoNode.optJSONObject(Constants.TEZ_VERSION);
+ if (tezVersion != null) {
+ String version = tezVersion.optString(Constants.VERSION);
+ String buildTime = tezVersion.optString(Constants.BUILD_TIME);
+ String revision = tezVersion.optString(Constants.REVISION);
+ this.versionInfo = new VersionInfo(version, buildTime, revision);
+ }
+ //TODO: might need to parse config info? (e.g, hive settings etc. could consume memory)
+ }
+ LOG.debug("Finished parsing tez application");
+ }
+ }
+
+ private JSONObject readJson(InputStream in) throws IOException, JSONException {
+ //Read entire content to memory
+ final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ IOUtils.copy(in, bout);
+ return new JSONObject(new String(bout.toByteArray(), "UTF-8"));
+ }
+
+ /**
+ * Read zip file contents. Every file can contain "dag", "vertices", "tasks", "task_attempts"
+ *
+ * @param atsFile
+ * @throws IOException
+ * @throws JSONException
+ */
+ private void parseATSZipFile(File atsFile)
+ throws IOException, JSONException, TezException, InterruptedException {
+ final ZipFile atsZipFile = new ZipFile(atsFile);
+ try {
+ Enumeration<? extends ZipEntry> zipEntries = atsZipFile.entries();
+ while (zipEntries.hasMoreElements()) {
+ ZipEntry zipEntry = zipEntries.nextElement();
+ LOG.info("Processing " + zipEntry.getName());
+ InputStream inputStream = atsZipFile.getInputStream(zipEntry);
+ JSONObject jsonObject = readJson(inputStream);
+
+ //This json can contain dag, vertices, tasks, task_attempts
+ JSONObject dagJson = jsonObject.optJSONObject(Constants.DAG);
+ if (dagJson != null) {
+ //TODO: support for multiple dags per ATS file later.
+ dagInfo = DagInfo.create(dagJson);
+ }
+
+ //Process vertex
+ JSONArray vertexJson = jsonObject.optJSONArray(Constants.VERTICES);
+ if (vertexJson != null) {
+ processVertices(vertexJson);
+ }
+
+ //Process task
+ JSONArray taskJson = jsonObject.optJSONArray(Constants.TASKS);
+ if (taskJson != null) {
+ processTasks(taskJson);
+ }
+
+ //Process task attempts
+ JSONArray attemptsJson = jsonObject.optJSONArray(Constants.TASK_ATTEMPTS);
+ if (attemptsJson != null) {
+ processAttempts(attemptsJson);
+ }
+
+ //Process application (mainly versionInfo)
+ JSONObject tezAppJson = jsonObject.optJSONObject(Constants.APPLICATION);
+ if (tezAppJson != null) {
+ processApplication(tezAppJson);
+ }
+ }
+ } finally {
+ atsZipFile.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java
new file mode 100644
index 0000000..b853d5c
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Additional input/ouput information present in DAG.
+ */
+
+@Public
+@Evolving
+public class AdditionalInputOutputDetails {
+ private final String name;
+ private final String clazz;
+ private final String initializer;
+ private final String userPayloadText;
+
+ public AdditionalInputOutputDetails(String name, String clazz, String initializer,
+ String userPayloadText) {
+ this.name = name;
+ this.clazz = clazz;
+ this.initializer = initializer;
+ this.userPayloadText = userPayloadText;
+ }
+
+ public final String getName() {
+ return name;
+ }
+
+ public final String getClazz() {
+ return clazz;
+ }
+
+ public final String getInitializer() {
+ return initializer;
+ }
+
+ public final String getUserPayloadText() {
+ return userPayloadText;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("name=").append(name).append(", ");
+ sb.append("clazz=").append(clazz).append(", ");
+ sb.append("initializer=").append(initializer).append(", ");
+ sb.append("userPayloadText=").append(userPayloadText);
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
new file mode 100644
index 0000000..8bd6bfb
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public abstract class BaseInfo {
+
+ protected TezCounters tezCounters;
+ protected List<Event> eventList;
+
+ BaseInfo(JSONObject jsonObject) throws JSONException {
+ final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+ //parse tez counters
+ tezCounters = Utils.parseTezCountersFromJSON(
+ otherInfoNode.optJSONObject(Constants.COUNTERS));
+
+ //parse events
+ eventList = Lists.newArrayList();
+ Utils.parseEvents(jsonObject.optJSONArray(Constants.EVENTS), eventList);
+ }
+
+ public TezCounters getTezCounters() {
+ return tezCounters;
+ }
+
+ /**
+ * Get start time w.r.t DAG
+ *
+ * @return long
+ */
+ public abstract long getStartTime();
+
+ /**
+ * Get finish time w.r.t DAG
+ *
+ * @return long
+ */
+ public abstract long getFinishTime();
+
+ /**
+ * Get absolute start time
+ *
+ * @return long
+ */
+ public abstract long getAbsStartTime();
+
+ /**
+ * Get absolute finish time
+ *
+ * @return long
+ */
+ public abstract long getAbsFinishTime();
+
+ public abstract String getDiagnostics();
+
+ public List<Event> getEvents() {
+ return eventList;
+ }
+
+ /**
+ * Get counter for a specific counter group name.
+ * If counterGroupName is not mentioned, it would end up returning counter found in all
+ * groups
+ *
+ * @param counterGroupName
+ * @param counter
+ * @return Map<String, TezCounter> tez counter at every counter group level
+ */
+ public Map<String, TezCounter> getCounter(String counterGroupName, String counter) {
+ //TODO: FS, TaskCounters are directly getting added as TezCounters always pass those. Need a
+ // way to get rid of these.
+ Map<String, TezCounter> result = Maps.newHashMap();
+ Iterator<String> iterator = tezCounters.getGroupNames().iterator();
+ boolean found = false;
+ while (iterator.hasNext()) {
+ CounterGroup counterGroup = tezCounters.getGroup(iterator.next());
+ if (counterGroupName != null) {
+ String groupName = counterGroup.getName();
+ if (groupName.equals(counterGroupName)) {
+ found = true;
+ }
+ }
+
+ //Explicitly mention that no need to create the counter if not present
+ TezCounter tezCounter = counterGroup.getUnderlyingGroup().findCounter(counter, false);
+ if (tezCounter != null) {
+ result.put(counterGroup.getName(), tezCounter);
+ }
+
+ if (found) {
+ //Retrieved counter specific to a counter group. Safe to exit.
+ break;
+ }
+
+ }
+ return result;
+ }
+
+ /**
+ * Find a counter in all counter groups
+ *
+ * @param counter
+ * @return Map of countergroup to TezCounter mapping
+ */
+ public Map<String, TezCounter> getCounter(String counter) {
+ return getCounter(null, counter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
new file mode 100644
index 0000000..a484bd5
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+import java.util.List;
+
+public abstract class BaseParser {
+
+ protected DagInfo dagInfo;
+ protected VersionInfo versionInfo;
+ protected final List<VertexInfo> vertexList;
+ protected final List<TaskInfo> taskList;
+ protected final List<TaskAttemptInfo> attemptList;
+
+
+ public BaseParser() {
+ vertexList = Lists.newLinkedList();
+ taskList = Lists.newLinkedList();
+ attemptList = Lists.newLinkedList();
+ }
+
+ /**
+ * link the parsed contents, so that it becomes easier to iterate from DAG-->Task and Task--DAG.
+ * e.g Link vertex to dag, task to vertex, attempt to task etc
+ */
+ protected void linkParsedContents() {
+ //Link vertex to DAG
+ for (VertexInfo vertexInfo : vertexList) {
+ vertexInfo.setDagInfo(dagInfo);
+ }
+
+ //Link task to vertex
+ for (TaskInfo taskInfo : taskList) {
+ //Link vertex to task
+ String vertexId = TezTaskID.fromString(taskInfo.getTaskId()).getVertexID().toString();
+ VertexInfo vertexInfo = dagInfo.getVertexFromId(vertexId);
+ Preconditions.checkState(vertexInfo != null, "VertexInfo for " + vertexId + " can't be "
+ + "null");
+ taskInfo.setVertexInfo(vertexInfo);
+ }
+
+ //Link task attempt to task
+ for (TaskAttemptInfo attemptInfo : attemptList) {
+ //Link task to task attempt
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(attemptInfo
+ .getTaskAttemptId());
+ VertexInfo vertexInfo = dagInfo.getVertexFromId(taskAttemptId.getTaskID()
+ .getVertexID().toString());
+ Preconditions.checkState(vertexInfo != null, "Vertex " + taskAttemptId.getTaskID()
+ .getVertexID().toString() + " is not present in DAG");
+ TaskInfo taskInfo = vertexInfo.getTask(taskAttemptId.getTaskID().toString());
+ attemptInfo.setTaskInfo(taskInfo);
+ }
+
+ //Set container details
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ for (TaskAttemptInfo taskAttemptInfo : vertexInfo.getTaskAttempts()) {
+ dagInfo.addContainerMapping(taskAttemptInfo.getContainer(), taskAttemptInfo);
+ }
+ }
+
+
+ //Set reference time for all events
+ for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+ setReferenceTime(vertexInfo.getEvents(), dagInfo.getStartTime());
+ for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+ setReferenceTime(taskInfo.getEvents(), dagInfo.getStartTime());
+ for (TaskAttemptInfo taskAttemptInfo : taskInfo.getTaskAttempts()) {
+ setReferenceTime(taskAttemptInfo.getEvents(), dagInfo.getStartTime());
+ }
+ }
+ }
+
+ dagInfo.setVersionInfo(versionInfo);
+ }
+
+ /**
+ * Set reference time to all events
+ *
+ * @param eventList
+ * @param referenceTime
+ */
+ private void setReferenceTime(List<Event> eventList, final long referenceTime) {
+ Iterables.all(eventList, new Predicate<Event>() {
+ @Override public boolean apply(Event input) {
+ input.setReferenceTime(referenceTime);
+ return false;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
new file mode 100644
index 0000000..3a24f15
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import org.apache.tez.common.ATSConstants;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public class Constants extends ATSConstants {
+
+ public static final String EVENT_TIME_STAMP = "timestamp";
+ public static final String TEZ_APPLICATION = "TEZ_APPLICATION";
+ public static final String TEZ_TASK_ID = "TEZ_TASK_ID";
+ public static final String TEZ_TASK_ATTEMPT_ID = "TEZ_TASK_ATTEMPT_ID";
+
+ public static final String EDGE_ID = "edgeId";
+ public static final String INPUT_VERTEX_NAME = "inputVertexName";
+ public static final String OUTPUT_VERTEX_NAME = "outputVertexName";
+ public static final String DATA_MOVEMENT_TYPE = "dataMovementType";
+ public static final String EDGE_SOURCE_CLASS = "edgeSourceClass";
+ public static final String EDGE_DESTINATION_CLASS = "edgeDestinationClass";
+ public static final String INPUT_PAYLOAD_TEXT = "inputUserPayloadAsText";
+ public static final String OUTPUT_PAYLOAD_TEXT = "outputUserPayloadAsText";
+
+ public static final String EDGES = "edges";
+ public static final String OUT_EDGE_IDS = "outEdgeIds";
+ public static final String IN_EDGE_IDS = "inEdgeIds";
+ public static final String ADDITIONAL_INPUTS = "additionalInputs";
+ public static final String ADDITIONAL_OUTPUTS = "additionalOutputs";
+
+ public static final String NAME = "name";
+ public static final String CLASS = "class";
+ public static final String INITIALIZER = "initializer";
+ public static final String USER_PAYLOAD_TEXT = "userPayloadAsText";
+
+ //constants for ATS data export
+ public static final String DAG = "dag";
+ public static final String VERTICES = "vertices";
+ public static final String TASKS = "tasks";
+ public static final String TASK_ATTEMPTS = "task_attempts";
+ public static final String APPLICATION = "application";
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java
new file mode 100644
index 0000000..4e01d1b
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Objects;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class Container {
+
+ private final String id;
+ private final String host;
+
+ public Container(String id, String host) {
+ this.id = id;
+ this.host = host;
+ }
+
+ public final String getId() {
+ return id;
+ }
+
+ public final String getHost() {
+ return host;
+ }
+
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("id=").append(id).append(", ");
+ sb.append("host=").append(host);
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override public int hashCode() {
+ return Objects.hashCode(id, host);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final Container other = (Container) obj;
+ return Objects.equal(this.id, other.id)
+ && Objects.equal(this.host, other.host);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
new file mode 100644
index 0000000..0f3c3af
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import org.apache.commons.collections.BidiMap;
+import org.apache.commons.collections.bidimap.DualHashBidiMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.event.VertexState;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class DagInfo extends BaseInfo {
+
+ private static final Log LOG = LogFactory.getLog(DagInfo.class);
+
+ //Fields populated via JSON
+ private final String name;
+ private final long startTime;
+ private final long endTime;
+ private final long submitTime;
+ private final int failedTasks;
+ private final String dagId;
+ private final int numVertices;
+ private final String status;
+ private final String diagnostics;
+ private VersionInfo versionInfo;
+
+ //VertexID --> VertexName & vice versa
+ private final BidiMap vertexNameIDMapping;
+
+ //edgeId to EdgeInfo mapping
+ private final Map<Integer, EdgeInfo> edgeInfoMap;
+
+ //Only for internal parsing (vertexname mapping)
+ private Map<String, BasicVertexInfo> basicVertexInfoMap;
+
+ //VertexName --> VertexInfo
+ private Map<String, VertexInfo> vertexNameMap;
+
+ private Multimap<Container, TaskAttemptInfo> containerMapping;
+
+ DagInfo(JSONObject jsonObject) throws JSONException {
+ super(jsonObject);
+
+ vertexNameMap = Maps.newHashMap();
+ vertexNameIDMapping = new DualHashBidiMap();
+ edgeInfoMap = Maps.newHashMap();
+ basicVertexInfoMap = Maps.newHashMap();
+ containerMapping = LinkedHashMultimap.create();
+
+ Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+ (Constants.TEZ_DAG_ID));
+
+ dagId = jsonObject.getString(Constants.ENTITY);
+
+ //Parse additional Info
+ JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+ startTime = otherInfoNode.optLong(Constants.START_TIME);
+ endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+ //TODO: Not getting populated correctly for lots of jobs. Verify
+ submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
+ diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+ failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
+ JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN);
+ name = (dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null;
+ if (dagPlan != null) {
+ JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES);
+ if (vertices != null) {
+ numVertices = vertices.length();
+ } else {
+ numVertices = 0;
+ }
+ parseDAGPlan(dagPlan);
+ } else {
+ numVertices = 0;
+ }
+ status = otherInfoNode.optString(Constants.STATUS);
+
+ //parse name id mapping
+ JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING);
+ if (vertexIDMappingJson != null) {
+ //get vertex name
+ for (Map.Entry<String, BasicVertexInfo> entry : basicVertexInfoMap.entrySet()) {
+ String vertexId = vertexIDMappingJson.optString(entry.getKey());
+ //vertexName --> vertexId
+ vertexNameIDMapping.put(entry.getKey(), vertexId);
+ }
+ }
+ }
+
+ public static DagInfo create(JSONObject jsonObject) throws JSONException {
+ DagInfo dagInfo = new DagInfo(jsonObject);
+ return dagInfo;
+ }
+
+ private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
+ parseEdges(dagPlan.optJSONArray(Constants.EDGES));
+
+ JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES);
+ parseBasicVertexInfo(verticesInfo);
+ }
+
+ private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException {
+ if (verticesInfo == null) {
+ LOG.info("No vertices available.");
+ return;
+ }
+
+ //Parse basic information available in DAG for vertex and edges
+ for (int i = 0; i < verticesInfo.length(); i++) {
+ BasicVertexInfo basicVertexInfo = new BasicVertexInfo();
+
+ JSONObject vJson = verticesInfo.getJSONObject(i);
+ basicVertexInfo.vertexName =
+ vJson.optString(Constants.VERTEX_NAME);
+ JSONArray inEdges = vJson.optJSONArray(Constants.IN_EDGE_IDS);
+ if (inEdges != null) {
+ String[] inEdgeIds = new String[inEdges.length()];
+ for (int j = 0; j < inEdges.length(); j++) {
+ inEdgeIds[j] = inEdges.get(j).toString();
+ }
+ basicVertexInfo.inEdgeIds = inEdgeIds;
+ }
+
+ JSONArray outEdges = vJson.optJSONArray(Constants.OUT_EDGE_IDS);
+ if (outEdges != null) {
+ String[] outEdgeIds = new String[outEdges.length()];
+ for (int j = 0; j < outEdges.length(); j++) {
+ outEdgeIds[j] = outEdges.get(j).toString();
+ }
+ basicVertexInfo.outEdgeIds = outEdgeIds;
+ }
+
+ JSONArray addInputsJson =
+ vJson.optJSONArray(Constants.ADDITIONAL_INPUTS);
+ basicVertexInfo.additionalInputs = parseAdditionalDetailsForVertex(addInputsJson);
+
+ JSONArray addOutputsJson =
+ vJson.optJSONArray(Constants.ADDITIONAL_OUTPUTS);
+ basicVertexInfo.additionalOutputs = parseAdditionalDetailsForVertex(addOutputsJson);
+
+ basicVertexInfoMap.put(basicVertexInfo.vertexName, basicVertexInfo);
+ }
+ }
+
+ /**
+ * get additional details available for every vertex in the dag
+ *
+ * @param jsonArray
+ * @return AdditionalInputOutputDetails[]
+ * @throws JSONException
+ */
+ private AdditionalInputOutputDetails[] parseAdditionalDetailsForVertex(JSONArray jsonArray) throws
+ JSONException {
+ if (jsonArray != null) {
+ AdditionalInputOutputDetails[]
+ additionalInputOutputDetails = new AdditionalInputOutputDetails[jsonArray.length()];
+ for (int j = 0; j < jsonArray.length(); j++) {
+ String name = jsonArray.getJSONObject(j).optString(
+ Constants.NAME);
+ String clazz = jsonArray.getJSONObject(j).optString(
+ Constants.CLASS);
+ String initializer =
+ jsonArray.getJSONObject(j).optString(Constants.INITIALIZER);
+ String userPayloadText = jsonArray.getJSONObject(j).optString(
+ Constants.USER_PAYLOAD_TEXT);
+
+ additionalInputOutputDetails[j] =
+ new AdditionalInputOutputDetails(name, clazz, initializer, userPayloadText);
+
+ }
+ return additionalInputOutputDetails;
+ }
+ return null;
+ }
+
+ /**
+ * Parse edge details in the DAG
+ *
+ * @param edgesArray
+ *
+ * @throws JSONException
+ */
+ private void parseEdges(JSONArray edgesArray) throws JSONException {
+ if (edgesArray == null) {
+ return;
+ }
+ for (int i = 0; i < edgesArray.length(); i++) {
+ JSONObject edge = edgesArray.getJSONObject(i);
+ Integer edgeId = edge.optInt(Constants.EDGE_ID);
+ String inputVertexName =
+ edge.optString(Constants.INPUT_VERTEX_NAME);
+ String outputVertexName =
+ edge.optString(Constants.OUTPUT_VERTEX_NAME);
+ String dataMovementType =
+ edge.optString(Constants.DATA_MOVEMENT_TYPE);
+ String edgeSourceClass =
+ edge.optString(Constants.EDGE_SOURCE_CLASS);
+ String edgeDestinationClass =
+ edge.optString(Constants.EDGE_DESTINATION_CLASS);
+ String inputUserPayloadAsText =
+ edge.optString(Constants.INPUT_PAYLOAD_TEXT);
+ String outputUserPayloadAsText =
+ edge.optString(Constants.OUTPUT_PAYLOAD_TEXT);
+ EdgeInfo edgeInfo = new EdgeInfo(inputVertexName, outputVertexName,
+ dataMovementType, edgeSourceClass, edgeDestinationClass, inputUserPayloadAsText,
+ outputUserPayloadAsText);
+ edgeInfoMap.put(edgeId, edgeInfo);
+ }
+ }
+
+ static class BasicVertexInfo {
+ String vertexName;
+ String[] inEdgeIds;
+ String[] outEdgeIds;
+ AdditionalInputOutputDetails[] additionalInputs;
+ AdditionalInputOutputDetails[] additionalOutputs;
+ }
+
+ void addVertexInfo(VertexInfo vertexInfo) {
+ BasicVertexInfo basicVertexInfo = basicVertexInfoMap.get(vertexInfo.getVertexName());
+
+ Preconditions.checkArgument(basicVertexInfo != null,
+ "VerteName " + vertexInfo.getVertexName()
+ + " not present in DAG's vertices " + basicVertexInfoMap.entrySet());
+
+ //populate additional information in VertexInfo
+ if (basicVertexInfo.additionalInputs != null) {
+ vertexInfo.setAdditionalInputInfoList(Arrays.asList(basicVertexInfo.additionalInputs));
+ }
+ if (basicVertexInfo.additionalOutputs != null) {
+ vertexInfo.setAdditionalOutputInfoList(Arrays.asList(basicVertexInfo.additionalOutputs));
+ }
+
+ //Populate edge information in vertex
+ if (basicVertexInfo.inEdgeIds != null) {
+ for (String edge : basicVertexInfo.inEdgeIds) {
+ EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+ Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+ vertexInfo.addInEdge(edgeInfo);
+ }
+ }
+
+ if (basicVertexInfo.outEdgeIds != null) {
+ for (String edge : basicVertexInfo.outEdgeIds) {
+ EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+ Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+ vertexInfo.addOutEdge(edgeInfo);
+ }
+ }
+
+ vertexNameMap.put(vertexInfo.getVertexName(), vertexInfo);
+ }
+
+ void setVersionInfo(VersionInfo versionInfo) {
+ this.versionInfo = versionInfo;
+ }
+
+ void addContainerMapping(Container container, TaskAttemptInfo taskAttemptInfo) {
+ this.containerMapping.put(container, taskAttemptInfo);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("dagID=").append(getDagId()).append(", ");
+ sb.append("dagName=").append(getName()).append(", ");
+ sb.append("status=").append(getStatus()).append(", ");
+ sb.append("startTime=").append(getStartTime()).append(", ");
+ sb.append("submitTime=").append(getAbsoluteSubmitTime()).append(", ");
+ sb.append("endTime=").append(getFinishTime()).append(", ");
+ sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+ sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+ sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", ");
+ sb.append("failedTasks=").append(getFailedTaskCount()).append(", ");
+ sb.append("events=").append(getEvents()).append(", ");
+ sb.append("status=").append(getStatus());
+ sb.append("]");
+ return sb.toString();
+ }
+
+ public Multimap<Container, TaskAttemptInfo> getContainerMapping() {
+ return Multimaps.unmodifiableMultimap(containerMapping);
+ }
+
+ public final VersionInfo getVersionInfo() {
+ return versionInfo;
+ }
+
+ public final String getName() {
+ return name;
+ }
+
+ public final Collection<EdgeInfo> getEdges() {
+ return Collections.unmodifiableCollection(edgeInfoMap.values());
+ }
+
+ public final long getAbsoluteSubmitTime() {
+ return submitTime;
+ }
+
+ public final long getAbsStartTime() {
+ return startTime;
+ }
+
+ public final long getAbsFinishTime() {
+ return endTime;
+ }
+
+ /**
+ * Reference start time for the DAG. Vertex, Task, TaskAttempt would map on to this.
+ * If absolute start time is needed, call getAbsStartTime().
+ *
+ * @return starting time w.r.t to dag
+ */
+ public final long getStartTime() {
+ return 0;
+ }
+
+ @Override
+ public final long getFinishTime() {
+ long dagEndTime = (endTime - startTime);
+ if (dagEndTime < 0) {
+ //probably dag is not complete or failed in middle. get the last task attempt time
+ for (VertexInfo vertexInfo : getVertices()) {
+ dagEndTime = (vertexInfo.getFinishTime() > dagEndTime) ? vertexInfo.getFinishTime() : dagEndTime;
+ }
+ }
+ return dagEndTime;
+ }
+
+ public final long getTimeTaken() {
+ return getFinishTime();
+ }
+
+ public final String getStatus() {
+ return status;
+ }
+
+ /**
+ * Get vertexInfo for a given vertexid
+ *
+ * @param vertexId
+ * @return VertexInfo
+ */
+ public VertexInfo getVertexFromId(String vertexId) {
+ return vertexNameMap.get(vertexNameIDMapping.getKey(vertexId));
+ }
+
+ /**
+ * Get vertexInfo for a given vertex name
+ *
+ * @param vertexName
+ * @return VertexInfo
+ */
+ public final VertexInfo getVertex(String vertexName) {
+ return vertexNameMap.get(vertexName);
+ }
+
+ public final String getDiagnostics() {
+ return diagnostics;
+ }
+
+ /**
+ * Get all vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getVertices() {
+ List<VertexInfo> vertices = Lists.newLinkedList(vertexNameMap.values());
+ Collections.sort(vertices, new Comparator<VertexInfo>() {
+
+ @Override public int compare(VertexInfo o1, VertexInfo o2) {
+ return (o1.getStartTime() < o2.getStartTime()) ? -1 :
+ ((o1.getStartTime() == o2.getStartTime()) ?
+ 0 : 1);
+ }
+ });
+ return Collections.unmodifiableList(vertices);
+ }
+
+ /**
+ * Get list of failed vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getFailedVertices() {
+ return getVertices(VertexState.FAILED);
+ }
+
+ /**
+ * Get list of killed vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getKilledVertices() {
+ return getVertices(VertexState.KILLED);
+ }
+
+ /**
+ * Get list of failed vertices
+ *
+ * @return List<VertexInfo>
+ */
+ public final List<VertexInfo> getSuccessfullVertices() {
+ return getVertices(VertexState.SUCCEEDED);
+ }
+
+ /**
+ * Get list of vertices belonging to a specific state
+ *
+ * @param state
+ * @return Collection<VertexInfo>
+ */
+ public final List<VertexInfo> getVertices(final VertexState state) {
+ return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+ (vertexNameMap.values()), new Predicate<VertexInfo>() {
+ @Override public boolean apply(VertexInfo input) {
+ return input.getStatus() != null && input.getStatus().equals(state.toString());
+ }
+ }
+ )
+ )
+ );
+ }
+
+ public final Map<String, VertexInfo> getVertexMapping() {
+ return Collections.unmodifiableMap(vertexNameMap);
+ }
+
+ private Ordering<VertexInfo> getVertexOrdering() {
+ return Ordering.from(new Comparator<VertexInfo>() {
+ @Override public int compare(VertexInfo o1, VertexInfo o2) {
+ return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+ ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+ 0 : 1);
+ }
+ });
+ }
+
+ /**
+ * Get the slowest vertex in the DAG
+ *
+ * @return VertexInfo
+ */
+ public final VertexInfo getSlowestVertex() {
+ List<VertexInfo> vertexInfoList = getVertices();
+ if (vertexInfoList.size() == 0) {
+ return null;
+ }
+ return getVertexOrdering().max(vertexInfoList);
+ }
+
+ /**
+ * Get the slowest vertex in the DAG
+ *
+ * @return VertexInfo
+ */
+ public final VertexInfo getFastestVertex() {
+ List<VertexInfo> vertexInfoList = getVertices();
+ if (vertexInfoList.size() == 0) {
+ return null;
+ }
+ return getVertexOrdering().min(vertexInfoList);
+ }
+
+ /**
+ * Get node details for this DAG. Would be useful for analyzing node to tasks.
+ *
+ * @return Multimap<String, TaskAttemptInfo> taskAttempt details at every node
+ */
+ public final Multimap<String, TaskAttemptInfo> getNodeDetails() {
+ Multimap<String, TaskAttemptInfo> nodeDetails = LinkedListMultimap.create();
+ for (VertexInfo vertexInfo : getVertices()) {
+ Multimap<Container, TaskAttemptInfo> containerMapping = vertexInfo.getContainersMapping();
+ for (Map.Entry<Container, TaskAttemptInfo> entry : containerMapping.entries()) {
+ nodeDetails.put(entry.getKey().getHost(), entry.getValue());
+ }
+ }
+ return nodeDetails;
+ }
+
+ /**
+ * Get containers used for this DAG
+ *
+ * @return Multimap<Container, TaskAttemptInfo> task attempt details at every container
+ */
+ public final Multimap<Container, TaskAttemptInfo> getContainersToTaskAttemptMapping() {
+ List<VertexInfo> VertexInfoList = getVertices();
+ Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+
+ for (VertexInfo vertexInfo : VertexInfoList) {
+ containerMapping.putAll(vertexInfo.getContainersMapping());
+ }
+ return Multimaps.unmodifiableMultimap(containerMapping);
+ }
+
+ public final Map getVertexNameIDMapping() {
+ return vertexNameIDMapping;
+ }
+
+ public final int getNumVertices() {
+ return numVertices;
+ }
+
+ public final String getDagId() {
+ return dagId;
+ }
+
+ public final int getFailedTaskCount() {
+ return failedTasks;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
new file mode 100644
index 0000000..ab8e831
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class EdgeInfo {
+
+ private final String inputVertexName;
+ private final String outputVertexName;
+ private final String dataMovementType;
+ private final String edgeSourceClass;
+ private final String edgeDestinationClass;
+ private final String inputUserPayloadAsText;
+ private final String outputUserPayloadAsText;
+
+ private VertexInfo sourceVertex;
+ private VertexInfo destinationVertex;
+
+ public EdgeInfo(String inputVertexName, String outputVertexName, String dataMovementType,
+ String edgeSourceClass, String edgeDestinationClass, String inputUserPayloadAsText, String
+ outputUserPayloadAsText) {
+ this.inputVertexName = inputVertexName;
+ this.outputVertexName = outputVertexName;
+ this.dataMovementType = dataMovementType;
+ this.edgeSourceClass = edgeSourceClass;
+ this.edgeDestinationClass = edgeDestinationClass;
+ this.inputUserPayloadAsText = inputUserPayloadAsText;
+ this.outputUserPayloadAsText = outputUserPayloadAsText;
+ }
+
+ public final String getInputVertexName() {
+ return inputVertexName;
+ }
+
+ public final String getOutputVertexName() {
+ return outputVertexName;
+ }
+
+ public final String getDataMovementType() {
+ return dataMovementType;
+ }
+
+ public final String getEdgeSourceClass() {
+ return edgeSourceClass;
+ }
+
+ public final String getEdgeDestinationClass() {
+ return edgeDestinationClass;
+ }
+
+ public final String getInputUserPayloadAsText() {
+ return inputUserPayloadAsText;
+ }
+
+ public final String getOutputUserPayloadAsText() {
+ return outputUserPayloadAsText;
+ }
+
+ public final VertexInfo getSourceVertex() {
+ return sourceVertex;
+ }
+
+ public final void setSourceVertex(VertexInfo sourceVertex) {
+ this.sourceVertex = sourceVertex;
+ }
+
+ public final VertexInfo getDestinationVertex() {
+ return destinationVertex;
+ }
+
+ public final void setDestinationVertex(VertexInfo destinationVertex) {
+ this.destinationVertex = destinationVertex;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("inputVertexName=").append(inputVertexName).append(", ");
+ sb.append("outputVertexName=").append(outputVertexName).append(", ");
+ sb.append("dataMovementType=").append(dataMovementType).append(", ");
+ sb.append("edgeSourceClass=").append(edgeSourceClass).append(", ");
+ sb.append("edgeDestinationClass=").append(edgeDestinationClass).append(", ");
+ sb.append("inputUserPayloadAsText=").append(inputUserPayloadAsText).append(",");
+ sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText).append(", ");
+ sb.append("sourceVertex=").append(sourceVertex.getVertexName()).append(", ");
+ sb.append("destinationVertex=").append(destinationVertex.getVertexName()).append(", ");
+ sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText);
+ sb.append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/11aa17e0/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
new file mode 100644
index 0000000..70310f3
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class Event {
+ private final String info;
+ private final String type;
+ private final long time;
+
+ private long refTime; //typically dag start time.
+
+ public Event(String info, String type, long time) {
+ this.time = time;
+ this.type = type;
+ this.info = info;
+ }
+
+ void setReferenceTime(long refTime) {
+ this.refTime = refTime;
+ }
+
+ public final String getInfo() {
+ return info;
+ }
+
+ public final String getType() {
+ return type;
+ }
+
+ public final long getAbsoluteTime() {
+ return time;
+ }
+
+ public final long getTime() {
+ return time - refTime;
+ }
+
+ @Override
+ public String toString() {
+ return "[info=" + info + ", type=" + type + ", time=" + time + "]";
+ }
+}