You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2015/12/07 23:42:58 UTC

[5/5] tez git commit: TEZ-2973. Backport Analyzers to branch-0.7

TEZ-2973. Backport Analyzers to branch-0.7


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8c8db7c5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8c8db7c5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8c8db7c5

Branch: refs/heads/branch-0.7
Commit: 8c8db7c516d0cd1ca256cddbe76bbe306e23f9c2
Parents: 868ca53
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Mon Dec 7 16:41:00 2015 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Mon Dec 7 16:41:00 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 pom.xml                                         |   5 +
 .../tez/common/counters/FileSystemCounter.java  |   4 +
 .../java/org/apache/tez/dag/utils/Graph.java    |  15 +-
 tez-plugins/pom.xml                             |   2 +
 .../tez-history-parser/findbugs-exclude.xml     |  16 +
 tez-plugins/tez-history-parser/pom.xml          | 190 +++++
 .../org/apache/tez/history/ATSImportTool.java   | 463 +++++++++++
 .../org/apache/tez/history/parser/ATSData.java  |  48 ++
 .../tez/history/parser/ATSFileParser.java       | 227 +++++
 .../tez/history/parser/SimpleHistoryParser.java | 242 ++++++
 .../datamodel/AdditionalInputOutputDetails.java |  71 ++
 .../tez/history/parser/datamodel/BaseInfo.java  | 142 ++++
 .../history/parser/datamodel/BaseParser.java    | 114 +++
 .../tez/history/parser/datamodel/Constants.java |  64 ++
 .../tez/history/parser/datamodel/Container.java |  70 ++
 .../tez/history/parser/datamodel/DagInfo.java   | 586 +++++++++++++
 .../tez/history/parser/datamodel/EdgeInfo.java  | 112 +++
 .../tez/history/parser/datamodel/Event.java     |  63 ++
 .../parser/datamodel/TaskAttemptInfo.java       | 379 +++++++++
 .../tez/history/parser/datamodel/TaskInfo.java  | 354 ++++++++
 .../history/parser/datamodel/VersionInfo.java   |  45 +
 .../history/parser/datamodel/VertexInfo.java    | 636 ++++++++++++++
 .../apache/tez/history/parser/utils/Utils.java  | 139 ++++
 .../apache/tez/history/TestHistoryParser.java   | 813 ++++++++++++++++++
 .../analyzers/job-analyzer/findbugs-exclude.xml |  28 +
 tez-tools/analyzers/job-analyzer/pom.xml        | 168 ++++
 .../java/org/apache/tez/analyzer/Analyzer.java  |  64 ++
 .../java/org/apache/tez/analyzer/CSVResult.java | 115 +++
 .../java/org/apache/tez/analyzer/Result.java    |  39 +
 .../tez/analyzer/plugins/AnalyzerDriver.java    |  59 ++
 .../plugins/ContainerReuseAnalyzer.java         |  97 +++
 .../analyzer/plugins/CriticalPathAnalyzer.java  | 646 +++++++++++++++
 .../tez/analyzer/plugins/LocalityAnalyzer.java  | 204 +++++
 .../analyzer/plugins/ShuffleTimeAnalyzer.java   | 223 +++++
 .../tez/analyzer/plugins/SkewAnalyzer.java      | 323 ++++++++
 .../tez/analyzer/plugins/SlowNodeAnalyzer.java  | 197 +++++
 .../analyzer/plugins/SlowTaskIdentifier.java    | 126 +++
 .../analyzer/plugins/SlowestVertexAnalyzer.java | 219 +++++
 .../tez/analyzer/plugins/SpillAnalyzerImpl.java | 145 ++++
 .../plugins/TaskConcurrencyAnalyzer.java        | 148 ++++
 .../tez/analyzer/plugins/TezAnalyzerBase.java   | 213 +++++
 .../VertexLevelCriticalPathAnalyzer.java        | 152 ++++
 .../org/apache/tez/analyzer/utils/SVGUtils.java | 334 ++++++++
 .../org/apache/tez/analyzer/utils/Utils.java    | 100 +++
 .../org/apache/tez/analyzer/TestAnalyzer.java   | 823 +++++++++++++++++++
 tez-tools/analyzers/pom.xml                     |  51 ++
 tez-tools/pom.xml                               |   4 +
 48 files changed, 9275 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7fb7957..8c23aa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2973. Backport Analyzers to branch-0.7
   TEZ-2975. Bump up apache commons dependency.
   TEZ-2970. Re-localization in TezChild does not use correct UGI.
   TEZ-2968. Counter limits exception causes AM to crash.

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3b7aaf4..169353f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,6 +176,11 @@
         <version>${pig.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.dropwizard.metrics</groupId>
+        <artifactId>metrics-core</artifactId>
+        <version>3.1.0</version>
+      </dependency>
+      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.5</version>

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
index 57d1053..73e3581 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
@@ -27,4 +27,8 @@ public enum FileSystemCounter {
   READ_OPS,
   LARGE_READ_OPS,
   WRITE_OPS,
+  HDFS_BYTES_READ,
+  HDFS_BYTES_WRITTEN,
+  FILE_BYTES_READ,
+  FILE_BYTES_WRITTEN
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
index cc9033d..1d8e395 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
@@ -62,6 +62,7 @@ public class Graph {
     List<Edge> outs;
     String label;
     String shape;
+    String color;
 
     public Node(String id) {
       this(id, null);
@@ -104,6 +105,10 @@ public class Graph {
     public void setShape(String shape) {
       this.shape = shape;
     }
+
+    public void setColor(String color) {
+      this.color = color;
+    }
   }
 
   private String name;
@@ -196,17 +201,19 @@ public class Graph {
     for (Node n : nodes) {
       if (n.shape != null && !n.shape.isEmpty()) {
         sb.append(String.format(
-            "%s%s [ label = %s, shape = %s ];",
+            "%s%s [ label = %s, shape = %s , color= %s];",
             indent,
             wrapSafeString(n.getUniqueId()),
             wrapSafeString(n.getLabel()),
-            wrapSafeString(n.shape)));
+            wrapSafeString(n.shape),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       } else {
         sb.append(String.format(
-            "%s%s [ label = %s ];",
+            "%s%s [ label = %s , color= %s ];",
             indent,
             wrapSafeString(n.getUniqueId()),
-            wrapSafeString(n.getLabel())));
+            wrapSafeString(n.getLabel()),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       }
       sb.append(System.getProperty("line.separator"));
       List<Edge> combinedOuts = combineEdges(n.outs);

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
index 89b9d1e..fe7c61a 100644
--- a/tez-plugins/pom.xml
+++ b/tez-plugins/pom.xml
@@ -34,6 +34,7 @@
       </activation>
       <modules>
         <module>tez-yarn-timeline-history</module>
+        <module>tez-history-parser</module>
       </modules>
     </profile>
     <profile>
@@ -46,6 +47,7 @@
       <modules>
         <module>tez-yarn-timeline-history</module>
         <module>tez-yarn-timeline-history-with-acls</module>
+        <module>tez-history-parser</module>
       </modules>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/findbugs-exclude.xml b/tez-plugins/tez-history-parser/findbugs-exclude.xml
new file mode 100644
index 0000000..5b11308
--- /dev/null
+++ b/tez-plugins/tez-history-parser/findbugs-exclude.xml
@@ -0,0 +1,16 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/pom.xml b/tez-plugins/tez-history-parser/pom.xml
new file mode 100644
index 0000000..7dce608
--- /dev/null
+++ b/tez-plugins/tez-history-parser/pom.xml
@@ -0,0 +1,190 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-plugins</artifactId>
+    <version>0.7.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-history-parser</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <mainClass>org.apache.tez.history.ATSImportTool</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <archive>
+            <manifest>
+              <addClasspath>true</addClasspath>
+              <mainClass>org.apache.tez.history.ATSImportTool</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <id>assemble-all</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
new file mode 100644
index 0000000..3efeb5a
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * <pre>
+ * Simple tool which imports ATS data pertaining to a DAG (Dag, Vertex, Task, Attempt)
+ * and creates a zip file out of it.
+ *
+ * usage:
+ *
+ * java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool
+ *
+ * OR
+ *
+ * HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar tez-history-parser-x.y.z.jar org.apache.tez.history.ATSImportTool
+ *
+ *
+ * --yarnTimelineAddress <yarnTimelineAddress>  Optional. Yarn Timeline Address(e.g http://clusterATSNode:8188)
+ * --batchSize <batchSize>       Optional. batch size for downloading data
+ * --dagId <dagId>               DagId that needs to be downloaded
+ * --downloadDir <downloadDir>   download directory where data needs to be downloaded
+ * --help                        print help
+ *
+ * </pre>
+ */
+@Evolving
+public class ATSImportTool extends Configured implements Tool {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ATSImportTool.class);
+
+  private static final String BATCH_SIZE = "batchSize";
+  private static final int BATCH_SIZE_DEFAULT = 100;
+
+  private static final String YARN_TIMELINE_SERVICE_ADDRESS = "yarnTimelineAddress";
+  private static final String DAG_ID = "dagId";
+  private static final String BASE_DOWNLOAD_DIR = "downloadDir";
+
+  private static final String HTTPS_SCHEME = "https://";
+  private static final String HTTP_SCHEME = "http://";
+
+  private static final String VERTEX_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+  private static final String TASK_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+  private static final String TASK_ATTEMPT_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+  private static final String UTF8 = "UTF-8";
+
+  private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+  private final int batchSize;
+  private final String baseUri;
+  private final String dagId;
+
+  private final File zipFile;
+  private final Client httpClient;
+  private final TezDAGID tezDAGID;
+
+  public ATSImportTool(String baseUri, String dagId, File downloadDir, int batchSize) {
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "dagId can not be null or empty");
+    Preconditions.checkArgument(downloadDir != null, "downloadDir can not be null");
+    tezDAGID = TezDAGID.fromString(dagId);
+
+    this.baseUri = baseUri;
+    this.batchSize = batchSize;
+    this.dagId = dagId;
+
+    this.httpClient = getHttpClient();
+
+    this.zipFile = new File(downloadDir, this.dagId + ".zip");
+
+    boolean result = downloadDir.mkdirs();
+    LOG.trace("Result of creating dir {}={}", downloadDir, result);
+    if (!downloadDir.exists()) {
+      throw new IllegalArgumentException("dir=" + downloadDir + " does not exist");
+    }
+
+    LOG.info("Using baseURL={}, dagId={}, batchSize={}, downloadDir={}", baseUri, dagId,
+        batchSize, downloadDir);
+  }
+
+  /**
+   * Download data from ATS for specific DAG
+   *
+   * @throws Exception
+   */
+  private void download() throws Exception {
+    FileOutputStream fos = null;
+    try {
+      fos = new FileOutputStream(zipFile, false);
+      ZipOutputStream zos = new ZipOutputStream(fos);
+      downloadData(zos);
+      IOUtils.closeQuietly(zos);
+    } catch (Exception e) {
+      LOG.error("Exception in download", e);
+      throw e;
+    } finally {
+      if (httpClient != null) {
+        httpClient.destroy();
+      }
+      IOUtils.closeQuietly(fos);
+    }
+  }
+
+  /**
+   * Download DAG data (DAG, Vertex, Task, TaskAttempts) from ATS and write to zip file
+   *
+   * @param zos
+   * @throws TezException
+   * @throws JSONException
+   * @throws IOException
+   */
+  private void downloadData(ZipOutputStream zos) throws TezException, JSONException, IOException {
+    JSONObject finalJson = new JSONObject();
+
+    //Download application details (TEZ_VERSION etc)
+    String tezAppId = "tez_" + tezDAGID.getApplicationId().toString();
+    String tezAppUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_APPLICATION, tezAppId);
+    JSONObject tezAppJson = getJsonRootEntity(tezAppUrl);
+    finalJson.put(Constants.APPLICATION, tezAppJson);
+
+    //Download dag
+    String dagUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_DAG_ID, dagId);
+    JSONObject dagRoot = getJsonRootEntity(dagUrl);
+    finalJson.put(Constants.DAG, dagRoot);
+
+    //Create a zip entry with dagId as its name.
+    ZipEntry zipEntry = new ZipEntry(dagId);
+    zos.putNextEntry(zipEntry);
+    //Write in formatted way
+    IOUtils.write(finalJson.toString(4), zos, UTF8);
+
+    //Download vertex
+    String vertexURL =
+        String.format(VERTEX_QUERY_STRING, baseUri,
+            Constants.TEZ_VERTEX_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+    downloadJSONArrayFromATS(vertexURL, zos, Constants.VERTICES);
+
+    //Download task
+    String taskURL = String.format(TASK_QUERY_STRING, baseUri,
+        Constants.TEZ_TASK_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+    downloadJSONArrayFromATS(taskURL, zos, Constants.TASKS);
+
+    //Download task attempts
+    String taskAttemptURL = String.format(TASK_ATTEMPT_QUERY_STRING, baseUri,
+        Constants.TEZ_TASK_ATTEMPT_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+    downloadJSONArrayFromATS(taskAttemptURL, zos, Constants.TASK_ATTEMPTS);
+  }
+
+  /**
+   * Download data from ATS in batches
+   *
+   * @param url
+   * @param zos
+   * @param tag
+   * @throws IOException
+   * @throws TezException
+   * @throws JSONException
+   */
+  private void downloadJSONArrayFromATS(String url, ZipOutputStream zos, String tag)
+      throws IOException, TezException, JSONException {
+
+    Preconditions.checkArgument(zos != null, "ZipOutputStream can not be null");
+
+    String baseUrl = url;
+    JSONArray entities;
+
+    long downloadedCount = 0;
+    while ((entities = getJsonRootEntity(url).optJSONArray(Constants.ENTITIES)) != null
+        && entities.length() > 0) {
+
+      int limit = (entities.length() >= batchSize) ? (entities.length() - 1) : entities.length();
+      LOG.debug("Limit={}, downloaded entities len={}", limit, entities.length());
+
+      //write downloaded part to zipfile.  This is done to avoid any memory pressure when
+      // downloading and writing 1000s of tasks.
+      ZipEntry zipEntry = new ZipEntry("part-" + System.currentTimeMillis() + ".json");
+      zos.putNextEntry(zipEntry);
+      JSONObject finalJson = new JSONObject();
+      finalJson.put(tag, entities);
+      IOUtils.write(finalJson.toString(4), zos, "UTF-8");
+      downloadedCount += entities.length();
+
+      if (entities.length() < batchSize) {
+        break;
+      }
+
+      //Set the last item in entities as the fromId
+      url = baseUrl + "&fromId="
+          + entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY);
+
+      String firstItem = entities.getJSONObject(0).getString(Constants.ENTITY);
+      String lastItem = entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY);
+      LOG.info("Downloaded={}, First item={}, LastItem={}, new url={}", downloadedCount,
+          firstItem, lastItem, url);
+    }
+  }
+
+  private void logErrorMessage(ClientResponse response) throws IOException {
+    LOG.error("Response status={}", response.getClientResponseStatus().toString());
+    LineIterator it = null;
+    try {
+      it = IOUtils.lineIterator(response.getEntityInputStream(), UTF8);
+      while (it.hasNext()) {
+        String line = it.nextLine();
+        LOG.error(line);
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  //For secure cluster, this should work as long as valid ticket is available in the node.
+  private JSONObject getJsonRootEntity(String url) throws TezException, IOException {
+    try {
+      WebResource wr = getHttpClient().resource(url);
+      ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE)
+          .type(MediaType.APPLICATION_JSON_TYPE)
+          .get(ClientResponse.class);
+
+      if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
+        // In the case of secure cluster, if there is any auth exception it sends the data back as
+        // a html page and JSON parsing could throw exceptions. Instead, get the stream contents
+        // completely and log it in case of error.
+        logErrorMessage(response);
+        throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+      }
+      return response.getEntity(JSONObject.class);
+    } catch (ClientHandlerException e) {
+      throw new TezException("Error processing response from YARN Timeline. URL=" + url, e);
+    } catch (UniformInterfaceException e) {
+      throw new TezException("Error accessing content from YARN Timeline - unexpected response. "
+          + "URL=" + url, e);
+    } catch (IllegalArgumentException e) {
+      throw new TezException("Error accessing content from YARN Timeline - invalid url. URL=" + url,
+          e);
+    }
+  }
+
+  private Client getHttpClient() {
+    if (httpClient == null) {
+      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
+      return new Client(new URLConnectionClientHandler(urlFactory), config);
+    }
+    return httpClient;
+  }
+
+  static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+    @Override
+    public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+      String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+          URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+      return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    try {
+      download();
+      return 0;
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.error("Error occurred when downloading data ", e);
+      return -1;
+    }
+  }
+
+  private static Options buildOptions() {
+    Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
+        .withDescription("DagId that needs to be downloaded").hasArg().isRequired(true).create();
+
+    Option downloadDirOption = OptionBuilder.withArgName(BASE_DOWNLOAD_DIR).withLongOpt
+        (BASE_DOWNLOAD_DIR)
+        .withDescription("Download directory where data needs to be downloaded").hasArg()
+        .isRequired(true).create();
+
+    Option atsAddressOption = OptionBuilder.withArgName(YARN_TIMELINE_SERVICE_ADDRESS).withLongOpt(
+        YARN_TIMELINE_SERVICE_ADDRESS)
+        .withDescription("Optional. ATS address (e.g http://clusterATSNode:8188)").hasArg()
+        .isRequired(false)
+        .create();
+
+    Option batchSizeOption = OptionBuilder.withArgName(BATCH_SIZE).withLongOpt(BATCH_SIZE)
+        .withDescription("Optional. batch size for downloading data").hasArg()
+        .isRequired(false)
+        .create();
+
+    Option help = OptionBuilder.withArgName("help").withLongOpt("help")
+        .withDescription("print help").isRequired(false).create();
+
+    Options opts = new Options();
+    opts.addOption(dagIdOption);
+    opts.addOption(downloadDirOption);
+    opts.addOption(atsAddressOption);
+    opts.addOption(batchSizeOption);
+    opts.addOption(help);
+    return opts;
+  }
+
+  static void printHelp(Options options) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setWidth(240);
+    String help = LINE_SEPARATOR
+        + "java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool"
+        + LINE_SEPARATOR
+        + "OR"
+        + LINE_SEPARATOR
+        + "HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar "
+        + "tez-history-parser-x.y.z.jar " + ATSImportTool.class.getName()
+        + LINE_SEPARATOR;
+    formatter.printHelp(240, help, "Options", options, "", true);
+  }
+
+  static boolean hasHttpsPolicy(Configuration conf) {
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    return (HttpConfig.Policy.HTTPS_ONLY == HttpConfig.Policy.fromString(yarnConf
+        .get(YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
+  }
+
+  static String getBaseTimelineURL(String yarnTimelineAddress, Configuration conf)
+      throws TezException {
+    boolean isHttps = hasHttpsPolicy(conf);
+
+    if (yarnTimelineAddress == null) {
+      if (isHttps) {
+        yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME);
+      } else {
+        yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME);
+      }
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(yarnTimelineAddress), "Yarn timeline address can"
+          + " not be empty. Please check configurations.");
+    } else {
+      yarnTimelineAddress = yarnTimelineAddress.trim();
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(yarnTimelineAddress), "Yarn timeline address can"
+          + " not be empty. Please provide valid url with --" +
+          YARN_TIMELINE_SERVICE_ADDRESS + " option");
+    }
+
+    yarnTimelineAddress = yarnTimelineAddress.toLowerCase();
+    if (!yarnTimelineAddress.startsWith(HTTP_SCHEME)
+        && !yarnTimelineAddress.startsWith(HTTPS_SCHEME)) {
+      yarnTimelineAddress = ((isHttps) ? HTTPS_SCHEME : HTTP_SCHEME) + yarnTimelineAddress;
+    }
+
+    try {
+      yarnTimelineAddress = new URI(yarnTimelineAddress).normalize().toString().trim();
+      yarnTimelineAddress = (yarnTimelineAddress.endsWith("/")) ?
+          yarnTimelineAddress.substring(0, yarnTimelineAddress.length() - 1) :
+          yarnTimelineAddress;
+    } catch (URISyntaxException e) {
+      throw new TezException("Please provide a valid URL. url=" + yarnTimelineAddress, e);
+    }
+
+    return Joiner.on("").join(yarnTimelineAddress, Constants.RESOURCE_URI_BASE);
+  }
+
+  @VisibleForTesting
+  public static int process(String[] args) throws Exception {
+    Options options = buildOptions();
+    try {
+      Configuration conf = new Configuration();
+      CommandLine cmdLine = new GnuParser().parse(options, args);
+      String dagId = cmdLine.getOptionValue(DAG_ID);
+
+      File downloadDir = new File(cmdLine.getOptionValue(BASE_DOWNLOAD_DIR));
+
+      String yarnTimelineAddress = cmdLine.getOptionValue(YARN_TIMELINE_SERVICE_ADDRESS);
+      String baseTimelineURL = getBaseTimelineURL(yarnTimelineAddress, conf);
+
+      int batchSize = (cmdLine.hasOption(BATCH_SIZE)) ?
+          (Integer.parseInt(cmdLine.getOptionValue(BATCH_SIZE))) : BATCH_SIZE_DEFAULT;
+
+      return ToolRunner.run(conf, new ATSImportTool(baseTimelineURL, dagId,
+          downloadDir, batchSize), args);
+    } catch (ParseException e) {
+      LOG.error("Error in parsing options ", e);
+      printHelp(options);
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Error in processing ", e);
+      throw e;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Utils.setupRootLogger();
+    int res = process(args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java
new file mode 100644
index 0000000..f504007
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser;
+
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.codehaus.jettison.json.JSONException;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Main interface to pull data from ATS.
+ * <p/>
+ * It is possible that to have ATS data store in any DB (e.g LevelDB or HBase).  Depending on
+ * store, there can be multiple implementations to pull data from these stores and create the
+ * DagInfo object for analysis.
+ * <p/>
+ */
+@Evolving
+public interface ATSData {
+
+  /**
+   * Get the DAG representation for processing
+   *
+   * @param dagId
+   * @return DagInfo
+   * @throws JSONException
+   * @throws TezException
+   */
+  public DagInfo getDAGData(String dagId) throws TezException;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
new file mode 100644
index 0000000..aae20eb
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Simple class to parse ATS zip file of a DAG and generate the relevant in-memory structure
+ * (DagInfo) necessary for processing later.
+ */
+
+@Public
+@Evolving
+public class ATSFileParser extends BaseParser implements ATSData {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ATSFileParser.class);
+
+  private final File atsZipFile;
+
+  public ATSFileParser(File atsZipFile) throws TezException {
+    super();
+    Preconditions.checkArgument(atsZipFile.exists(), "Zipfile " + atsZipFile + " does not exist");
+    this.atsZipFile = atsZipFile;
+  }
+
+  @Override
+  public DagInfo getDAGData(String dagId) throws TezException {
+    try {
+      parseATSZipFile(atsZipFile);
+
+      linkParsedContents();
+
+      return dagInfo;
+    } catch (IOException e) {
+      LOG.error("Error in reading DAG ", e);
+      throw new TezException(e);
+    } catch (JSONException e) {
+      LOG.error("Error in parsing DAG ", e);
+      throw new TezException(e);
+    } catch (InterruptedException e) {
+      throw new TezException(e);
+    }
+  }
+
+  /**
+   * Parse vertices json
+   *
+   * @param verticesJson
+   * @throws JSONException
+   */
+  private void processVertices(JSONArray verticesJson) throws JSONException {
+    //Process vertex information
+    Preconditions.checkState(verticesJson != null, "Vertex json can not be null");
+    if (verticesJson != null) {
+      LOG.info("Started parsing vertex");
+      for (int i = 0; i < verticesJson.length(); i++) {
+        VertexInfo vertexInfo = VertexInfo.create(verticesJson.getJSONObject(i));
+        vertexList.add(vertexInfo);
+      }
+      LOG.info("Finished parsing vertex");
+    }
+  }
+
+  /**
+   * Parse Tasks json
+   *
+   * @param tasksJson
+   * @throws JSONException
+   */
+  private void processTasks(JSONArray tasksJson) throws JSONException {
+    //Process Task information
+    Preconditions.checkState(tasksJson != null, "Task json can not be null");
+    if (tasksJson != null) {
+      LOG.debug("Started parsing task");
+      for (int i = 0; i < tasksJson.length(); i++) {
+        TaskInfo taskInfo = TaskInfo.create(tasksJson.getJSONObject(i));
+        taskList.add(taskInfo);
+      }
+      LOG.debug("Finished parsing task");
+    }
+  }
+
+  /**
+   * Parse TaskAttempt json
+   *
+   * @param taskAttemptsJson
+   * @throws JSONException
+   */
+  private void processAttempts(JSONArray taskAttemptsJson) throws JSONException {
+    //Process TaskAttempt information
+    Preconditions.checkState(taskAttemptsJson != null, "Attempts json can not be null");
+    if (taskAttemptsJson != null) {
+      LOG.debug("Started parsing task attempts");
+      for (int i = 0; i < taskAttemptsJson.length(); i++) {
+        TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(taskAttemptsJson.getJSONObject(i));
+        attemptList.add(attemptInfo);
+      }
+      LOG.debug("Finished parsing task attempts");
+    }
+  }
+
+  /**
+   * Parse TezApplication json
+   *
+   * @param tezApplicationJson
+   * @throws JSONException
+   */
+  private void processApplication(JSONObject tezApplicationJson) throws JSONException {
+    if (tezApplicationJson != null) {
+      LOG.debug("Started parsing tez application");
+      JSONObject otherInfoNode = tezApplicationJson.optJSONObject(Constants.OTHER_INFO);
+      if (otherInfoNode != null) {
+        JSONObject tezVersion = otherInfoNode.optJSONObject(Constants.TEZ_VERSION);
+        if (tezVersion != null) {
+          String version = tezVersion.optString(Constants.VERSION);
+          String buildTime = tezVersion.optString(Constants.BUILD_TIME);
+          String revision = tezVersion.optString(Constants.REVISION);
+          this.versionInfo = new VersionInfo(version, buildTime, revision);
+        }
+        //TODO: might need to parse config info? (e.g, hive settings etc. could consume memory)
+      }
+      LOG.debug("Finished parsing tez application");
+    }
+  }
+
+  private JSONObject readJson(InputStream in) throws IOException, JSONException {
+    //Read entire content to memory
+    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    IOUtils.copy(in, bout);
+    return new JSONObject(new String(bout.toByteArray(), "UTF-8"));
+  }
+
+  /**
+   * Read zip file contents. Every file can contain "dag", "vertices", "tasks", "task_attempts"
+   *
+   * @param atsFile
+   * @throws IOException
+   * @throws JSONException
+   */
+  private void parseATSZipFile(File atsFile)
+      throws IOException, JSONException, TezException, InterruptedException {
+    final ZipFile atsZipFile = new ZipFile(atsFile);
+    try {
+      Enumeration<? extends ZipEntry> zipEntries = atsZipFile.entries();
+      while (zipEntries.hasMoreElements()) {
+        ZipEntry zipEntry = zipEntries.nextElement();
+        LOG.info("Processing " + zipEntry.getName());
+        InputStream inputStream = atsZipFile.getInputStream(zipEntry);
+        JSONObject jsonObject = readJson(inputStream);
+
+        //This json can contain dag, vertices, tasks, task_attempts
+        JSONObject dagJson = jsonObject.optJSONObject(Constants.DAG);
+        if (dagJson != null) {
+          //TODO: support for multiple dags per ATS file later.
+          dagInfo = DagInfo.create(dagJson);
+        }
+
+        //Process vertex
+        JSONArray vertexJson = jsonObject.optJSONArray(Constants.VERTICES);
+        if (vertexJson != null) {
+          processVertices(vertexJson);
+        }
+
+        //Process task
+        JSONArray taskJson = jsonObject.optJSONArray(Constants.TASKS);
+        if (taskJson != null) {
+          processTasks(taskJson);
+        }
+
+        //Process task attempts
+        JSONArray attemptsJson = jsonObject.optJSONArray(Constants.TASK_ATTEMPTS);
+        if (attemptsJson != null) {
+          processAttempts(attemptsJson);
+        }
+
+        //Process application (mainly versionInfo)
+        JSONObject tezAppJson = jsonObject.optJSONObject(Constants.APPLICATION);
+        if (tezAppJson != null) {
+          processApplication(tezAppJson);
+        }
+      }
+    } finally {
+      atsZipFile.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
new file mode 100644
index 0000000..4d3e96f
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Parser utility to parse data generated by SimpleHistoryLogging to in-memory datamodel provided
+ * in org.apache.tez.history.parser.datamodel
+ * <p/>
+ * <p/>
+ * Most of the information should be available. Minor info like VersionInfo may not be available,
+ * as it is not captured in SimpleHistoryLogging.
+ */
+public class SimpleHistoryParser extends BaseParser {
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleHistoryParser.class);
+  private static final String UTF8 = "UTF-8";
+  private final File historyFile;
+
+
+  public SimpleHistoryParser(File historyFile) {
+    super();
+    Preconditions.checkArgument(historyFile.exists(), historyFile + " does not exist");
+    this.historyFile = historyFile;
+  }
+
+  /**
+   * Get in-memory representation of DagInfo
+   *
+   * @return DagInfo
+   * @throws TezException
+   */
+  public DagInfo getDAGData(String dagId) throws TezException {
+    try {
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please provide valid dagId");
+      dagId = dagId.trim();
+      parseContents(historyFile, dagId);
+      linkParsedContents();
+      return dagInfo;
+    } catch (IOException e) {
+      LOG.error("Error in reading DAG ", e);
+      throw new TezException(e);
+    } catch (JSONException e) {
+      LOG.error("Error in parsing DAG ", e);
+      throw new TezException(e);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, JSONObject destination) throws JSONException {
+    if (source == null || destination == null) {
+      return;
+    }
+    for (Iterator it = source.keys(); it.hasNext(); ) {
+      String key = (String) it.next();
+      Object val = source.get(key);
+      destination.put(key, val);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, String entityName,
+      Map<String, JSONObject> destMap) throws JSONException {
+    JSONObject destinationJson = destMap.get(entityName);
+    JSONObject destOtherInfo = destinationJson.getJSONObject(Constants.OTHER_INFO);
+    populateOtherInfo(source, destOtherInfo);
+  }
+
+  private void parseContents(File historyFile, String dagId)
+    throws JSONException, FileNotFoundException, TezException {
+    Scanner scanner = new Scanner(historyFile, UTF8);
+    scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR);
+    JSONObject dagJson = null;
+    Map<String, JSONObject> vertexJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> taskJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> attemptJsonMap = Maps.newHashMap();
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    while (scanner.hasNext()) {
+      String line = scanner.next();
+      JSONObject jsonObject = new JSONObject(line);
+      String entity = jsonObject.getString(Constants.ENTITY);
+      String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
+      if (entityType.equals(Constants.TEZ_DAG_ID)) {
+        if (!dagId.equals(entity)) {
+          LOG.warn(dagId + " is not matching with " + entity);
+          continue;
+        }
+        // Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them
+        // would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish
+        // time etc).
+        if (dagJson == null) {
+          dagJson = jsonObject;
+        }
+        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, dagOtherInfo);
+      }
+      else if (entityType.equals(Constants.TEZ_VERTEX_ID)) {
+        String vertexName = entity;
+        TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
+        if (!tezDAGID.equals(tezVertexID.getDAGId())) {
+          LOG.warn(vertexName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!vertexJsonMap.containsKey(vertexName)) {
+          vertexJsonMap.put(vertexName, jsonObject);
+        }
+        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, vertexName, vertexJsonMap);
+      }
+      else if (entityType.equals(Constants.TEZ_TASK_ID)) {
+        String taskName = entity;
+        TezTaskID tezTaskID = TezTaskID.fromString(taskName);
+        if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) {
+          LOG.warn(taskName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!taskJsonMap.containsKey(taskName)) {
+          taskJsonMap.put(taskName, jsonObject);
+        }
+        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskName, taskJsonMap);
+      }
+      else if (entityType.equals(Constants.TEZ_TASK_ATTEMPT_ID)) {
+        String taskAttemptName = entity;
+        TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName);
+        if (!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) {
+          LOG.warn(taskAttemptName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!attemptJsonMap.containsKey(taskAttemptName)) {
+          attemptJsonMap.put(taskAttemptName, jsonObject);
+        }
+        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap);
+      }
+    }
+    scanner.close();
+    if (dagJson != null) {
+      this.dagInfo = DagInfo.create(dagJson);
+    } else {
+      LOG.error("Dag is not yet parsed. Looks like partial file.");
+      throw new TezException(
+          "Please provide a valid/complete history log file containing " + dagId);
+    }
+    for (JSONObject jsonObject : vertexJsonMap.values()) {
+      VertexInfo vertexInfo = VertexInfo.create(jsonObject);
+      this.vertexList.add(vertexInfo);
+      LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
+    }
+    for (JSONObject jsonObject : taskJsonMap.values()) {
+      TaskInfo taskInfo = TaskInfo.create(jsonObject);
+      this.taskList.add(taskInfo);
+      LOG.debug("Parsed task {}", taskInfo.getTaskId());
+    }
+    for (JSONObject jsonObject : attemptJsonMap.values()) {
+      /**
+       * For converting SimpleHistoryLogging to in-memory representation
+       *
+       * We need to get "relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
+       * "entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
+       * "entitytype":"containerId"} and populate it in otherInfo object so that in-memory
+       * representation can parse it correctly
+       */
+      JSONArray relatedEntities = jsonObject.optJSONArray(Constants.RELATED_ENTITIES);
+      if (relatedEntities == null) {
+        //This can happen when CONTAINER_EXITED abruptly. (e.g Container failed, exitCode=1)
+        LOG.debug("entity {} did not have related entities",
+            jsonObject.optJSONObject(Constants.ENTITY));
+      } else {
+        JSONObject subJsonObject = relatedEntities.optJSONObject(0);
+        if (subJsonObject != null) {
+          String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
+          if (!Strings.isNullOrEmpty(nodeId) && nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
+            //populate it in otherInfo
+            JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+            String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
+            if (otherInfo != null && nodeIdVal != null) {
+              otherInfo.put(Constants.NODE_ID, nodeIdVal);
+            }
+          }
+        }
+
+        subJsonObject = relatedEntities.optJSONObject(1);
+        if (subJsonObject != null) {
+          String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
+          if (!Strings.isNullOrEmpty(containerId) && containerId
+              .equalsIgnoreCase(Constants.CONTAINER_ID)) {
+            //populate it in otherInfo
+            JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+            String containerIdVal = subJsonObject.optString(Constants.ENTITY);
+            if (otherInfo != null && containerIdVal != null) {
+              otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
+            }
+          }
+        }
+      }
+      TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
+      this.attemptList.add(attemptInfo);
+      LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java
new file mode 100644
index 0000000..b853d5c
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Additional input/ouput information present in DAG.
+ */
+
+@Public
+@Evolving
+public class AdditionalInputOutputDetails {
+  private final String name;
+  private final String clazz;
+  private final String initializer;
+  private final String userPayloadText;
+
+  public AdditionalInputOutputDetails(String name, String clazz, String initializer,
+      String userPayloadText) {
+    this.name = name;
+    this.clazz = clazz;
+    this.initializer = initializer;
+    this.userPayloadText = userPayloadText;
+  }
+
+  public final String getName() {
+    return name;
+  }
+
+  public final String getClazz() {
+    return clazz;
+  }
+
+  public final String getInitializer() {
+    return initializer;
+  }
+
+  public final String getUserPayloadText() {
+    return userPayloadText;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("name=").append(name).append(", ");
+    sb.append("clazz=").append(clazz).append(", ");
+    sb.append("initializer=").append(initializer).append(", ");
+    sb.append("userPayloadText=").append(userPayloadText);
+    sb.append("]");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
new file mode 100644
index 0000000..3f9666a
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public abstract class BaseInfo {
+
+  protected TezCounters tezCounters;
+  protected List<Event> eventList;
+
+  BaseInfo(JSONObject jsonObject) throws JSONException {
+    final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    //parse tez counters
+    tezCounters = Utils.parseTezCountersFromJSON(
+        otherInfoNode.optJSONObject(Constants.COUNTERS));
+
+    //parse events
+    eventList = Lists.newArrayList();
+    Utils.parseEvents(jsonObject.optJSONArray(Constants.EVENTS), eventList);
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
+
+  /**
+   * Get start time w.r.t DAG
+   *
+   * @return long
+   */
+  public abstract long getStartTimeInterval();
+
+  /**
+   * Get finish time w.r.t DAG
+   *
+   * @return long
+   */
+  public abstract long getFinishTimeInterval();
+
+  /**
+   * Get absolute start time
+   *
+   * @return long
+   */
+  public abstract long getStartTime();
+
+  /**
+   * Get absolute finish time
+   *
+   * @return long
+   */
+  public abstract long getFinishTime();
+
+  public abstract String getDiagnostics();
+
+  public List<Event> getEvents() {
+    return eventList;
+  }
+
+  /**
+   * Get counter for a specific counter group name.
+   * If counterGroupName is not mentioned, it would end up returning counter found in all
+   * groups
+   *
+   * @param counterGroupName
+   * @param counter
+   * @return Map<String, TezCounter> tez counter at every counter group level
+   */
+  public Map<String, TezCounter> getCounter(String counterGroupName, String counter) {
+    //TODO: FS, TaskCounters are directly getting added as TezCounters always pass those.  Need a
+    // way to get rid of these.
+    Map<String, TezCounter> result = Maps.newHashMap();
+    Iterator<String> iterator = tezCounters.getGroupNames().iterator();
+    boolean found = false;
+    while (iterator.hasNext()) {
+      CounterGroup counterGroup = tezCounters.getGroup(iterator.next());
+      if (counterGroupName != null) {
+        String groupName = counterGroup.getName();
+        if (groupName.equals(counterGroupName)) {
+          found = true;
+        }
+      }
+
+      //Explicitly mention that no need to create the counter if not present
+      TezCounter tezCounter = counterGroup.getUnderlyingGroup().findCounter(counter, false);
+      if (tezCounter != null) {
+        result.put(counterGroup.getName(), tezCounter);
+      }
+
+      if (found) {
+        //Retrieved counter specific to a counter group. Safe to exit.
+        break;
+      }
+
+    }
+    return result;
+  }
+
+  /**
+   * Find a counter in all counter groups
+   *
+   * @param counter
+   * @return Map of countergroup to TezCounter mapping
+   */
+  public Map<String, TezCounter> getCounter(String counter) {
+    return getCounter(null, counter);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
new file mode 100644
index 0000000..62ba474
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+import java.util.List;
+
+public abstract class BaseParser {
+
+  protected DagInfo dagInfo;
+  protected VersionInfo versionInfo;
+  protected final List<VertexInfo> vertexList;
+  protected final List<TaskInfo> taskList;
+  protected final List<TaskAttemptInfo> attemptList;
+
+
+  public BaseParser() {
+    vertexList = Lists.newLinkedList();
+    taskList = Lists.newLinkedList();
+    attemptList = Lists.newLinkedList();
+  }
+
+  /**
+   * link the parsed contents, so that it becomes easier to iterate from DAG-->Task and Task--DAG.
+   * e.g Link vertex to dag, task to vertex, attempt to task etc
+   */
+  protected void linkParsedContents() {
+    //Link vertex to DAG
+    for (VertexInfo vertexInfo : vertexList) {
+      vertexInfo.setDagInfo(dagInfo);
+    }
+
+    //Link task to vertex
+    for (TaskInfo taskInfo : taskList) {
+      //Link vertex to task
+      String vertexId = TezTaskID.fromString(taskInfo.getTaskId()).getVertexID().toString();
+      VertexInfo vertexInfo = dagInfo.getVertexFromId(vertexId);
+      Preconditions.checkState(vertexInfo != null, "VertexInfo for " + vertexId + " can't be "
+          + "null");
+      taskInfo.setVertexInfo(vertexInfo);
+    }
+
+    //Link task attempt to task
+    for (TaskAttemptInfo attemptInfo : attemptList) {
+      //Link task to task attempt
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(attemptInfo
+          .getTaskAttemptId());
+      VertexInfo vertexInfo = dagInfo.getVertexFromId(taskAttemptId.getTaskID()
+          .getVertexID().toString());
+      Preconditions.checkState(vertexInfo != null, "Vertex " + taskAttemptId.getTaskID()
+          .getVertexID().toString() + " is not present in DAG");
+      TaskInfo taskInfo = vertexInfo.getTask(taskAttemptId.getTaskID().toString());
+      attemptInfo.setTaskInfo(taskInfo);
+    }
+
+    //Set container details
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo taskAttemptInfo : vertexInfo.getTaskAttempts()) {
+        dagInfo.addContainerMapping(taskAttemptInfo.getContainer(), taskAttemptInfo);
+      }
+    }
+
+
+    //Set reference time for all events
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      setReferenceTime(vertexInfo.getEvents(), dagInfo.getStartTimeInterval());
+      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+        setReferenceTime(taskInfo.getEvents(), dagInfo.getStartTimeInterval());
+        for (TaskAttemptInfo taskAttemptInfo : taskInfo.getTaskAttempts()) {
+          setReferenceTime(taskAttemptInfo.getEvents(), dagInfo.getStartTimeInterval());
+        }
+      }
+    }
+
+    dagInfo.setVersionInfo(versionInfo);
+  }
+
+  /**
+   * Set reference time to all events
+   *
+   * @param eventList
+   * @param referenceTime
+   */
+  private void setReferenceTime(List<Event> eventList, final long referenceTime) {
+    Iterables.all(eventList, new Predicate<Event>() {
+      @Override public boolean apply(Event input) {
+        input.setReferenceTime(referenceTime);
+        return false;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
new file mode 100644
index 0000000..dce79e2
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import org.apache.tez.common.ATSConstants;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public class Constants extends ATSConstants {
+
+  public static final String EVENT_TIME_STAMP = "timestamp";
+  public static final String TEZ_APPLICATION = "TEZ_APPLICATION";
+  public static final String TEZ_TASK_ID = "TEZ_TASK_ID";
+  public static final String TEZ_TASK_ATTEMPT_ID = "TEZ_TASK_ATTEMPT_ID";
+
+  public static final String EDGE_ID = "edgeId";
+  public static final String INPUT_VERTEX_NAME = "inputVertexName";
+  public static final String OUTPUT_VERTEX_NAME = "outputVertexName";
+  public static final String DATA_MOVEMENT_TYPE = "dataMovementType";
+  public static final String EDGE_SOURCE_CLASS = "edgeSourceClass";
+  public static final String EDGE_DESTINATION_CLASS = "edgeDestinationClass";
+  public static final String INPUT_PAYLOAD_TEXT = "inputUserPayloadAsText";
+  public static final String OUTPUT_PAYLOAD_TEXT = "outputUserPayloadAsText";
+
+  public static final String EDGES = "edges";
+  public static final String OUT_EDGE_IDS = "outEdgeIds";
+  public static final String IN_EDGE_IDS = "inEdgeIds";
+  public static final String ADDITIONAL_INPUTS = "additionalInputs";
+  public static final String ADDITIONAL_OUTPUTS = "additionalOutputs";
+
+  public static final String NAME = "name";
+  public static final String CLASS = "class";
+  public static final String INITIALIZER = "initializer";
+  public static final String USER_PAYLOAD_TEXT = "userPayloadAsText";
+
+  public static final String DAG_CONTEXT = "dagContext";
+
+  //constants for ATS data export
+  public static final String DAG = "dag";
+  public static final String VERTICES = "vertices";
+  public static final String TASKS = "tasks";
+  public static final String TASK_ATTEMPTS = "task_attempts";
+  public static final String APPLICATION = "application";
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java
new file mode 100644
index 0000000..4e01d1b
--- /dev/null
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Objects;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class Container {
+
+  private final String id;
+  private final String host;
+
+  public Container(String id, String host) {
+    this.id = id;
+    this.host = host;
+  }
+
+  public final String getId() {
+    return id;
+  }
+
+  public final String getHost() {
+    return host;
+  }
+
+  @Override public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("id=").append(id).append(", ");
+    sb.append("host=").append(host);
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @Override public int hashCode() {
+    return Objects.hashCode(id, host);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final Container other = (Container) obj;
+    return Objects.equal(this.id, other.id)
+        && Objects.equal(this.host, other.host);
+  }
+}