You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/01/14 04:05:29 UTC

[2/4] tez git commit: TEZ-1931. Publish tez version info to Timeline. (hitesh)

TEZ-1931. Publish tez version info to Timeline. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d6d03f9e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d6d03f9e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d6d03f9e

Branch: refs/heads/master
Commit: d6d03f9e85952f05b7d11be4dd5e38befcbbeefe
Parents: 8a491f8
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jan 13 18:28:45 2015 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jan 13 18:28:45 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../ats/TestATSHistoryLoggingService.java       | 145 +++++
 .../ats/TestATSHistoryWithMiniCluster.java      | 242 +++++++
 .../ats/TestHistoryEventTimelineConversion.java | 625 +++++++++++++++++++
 .../ats/TestATSHistoryLoggingService.java       | 145 -----
 .../ats/TestATSHistoryWithMiniCluster.java      | 242 -------
 .../ats/TestHistoryEventTimelineConversion.java | 625 -------------------
 7 files changed, 1013 insertions(+), 1012 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25eff15..a4b4a4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1931. Publish tez version info to Timeline.
   TEZ-1938. Build warning duplicate jersey-json definitions
   TEZ-1910. Build fails against hadoop-2.2.0.
   TEZ-1882. Tez UI build does not work on Windows

http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
new file mode 100644
index 0000000..18ec43e
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestATSHistoryLoggingService {
+
+  private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class);
+
+  private ATSHistoryLoggingService atsHistoryLoggingService;
+  private AppContext appContext;
+  private Configuration conf;
+  private int atsInvokeCounter;
+  private int atsEntitiesCounter;
+  private SystemClock clock = new SystemClock();
+
+  @Before
+  public void setup() throws Exception {
+    appContext = mock(AppContext.class);
+    atsHistoryLoggingService = new ATSHistoryLoggingService();
+    atsHistoryLoggingService.setAppContext(appContext);
+    conf = new Configuration(false);
+    conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+        1000l);
+    conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    atsInvokeCounter = 0;
+    atsEntitiesCounter = 0;
+    atsHistoryLoggingService.init(conf);
+    atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
+    atsHistoryLoggingService.start();
+    when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getCurrentDAGID()).thenReturn(null);
+    when(atsHistoryLoggingService.timelineClient.putEntities(
+        Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
+        new Answer<Object>() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            ++atsInvokeCounter;
+            atsEntitiesCounter += invocation.getArguments().length;
+            try {
+              Thread.sleep(500l);
+            } catch (InterruptedException e) {
+              // do nothing
+            }
+            return null;
+          }
+        }
+    );
+  }
+
+  @After
+  public void teardown() {
+    atsHistoryLoggingService.stop();
+    atsHistoryLoggingService = null;
+  }
+
+  @Test(timeout=20000)
+  public void testATSHistoryLoggingServiceShutdown() {
+    TezDAGID tezDAGID = TezDAGID.getInstance(
+        ApplicationId.newInstance(100l, 1), 1);
+    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+
+    for (int i = 0; i < 100; ++i) {
+      atsHistoryLoggingService.handle(historyEvent);
+    }
+
+    try {
+      Thread.sleep(2500l);
+    } catch (InterruptedException e) {
+      // Do nothing
+    }
+    atsHistoryLoggingService.stop();
+
+    LOG.info("ATS entitiesSent=" + atsEntitiesCounter
+        + ", timelineInvocations=" + atsInvokeCounter);
+
+    Assert.assertTrue(atsEntitiesCounter >= 4);
+    Assert.assertTrue(atsEntitiesCounter < 20);
+
+  }
+
+  @Test(timeout=20000)
+  public void testATSEventBatching() {
+    TezDAGID tezDAGID = TezDAGID.getInstance(
+        ApplicationId.newInstance(100l, 1), 1);
+    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+
+    for (int i = 0; i < 100; ++i) {
+      atsHistoryLoggingService.handle(historyEvent);
+    }
+
+    try {
+      Thread.sleep(1000l);
+    } catch (InterruptedException e) {
+      // Do nothing
+    }
+    LOG.info("ATS entitiesSent=" + atsEntitiesCounter
+        + ", timelineInvocations=" + atsInvokeCounter);
+
+    Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter);
+    Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
new file mode 100644
index 0000000..9c4f721
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.io.IOException;
+import java.util.Random;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+public class TestATSHistoryWithMiniCluster {
+
+  private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class);
+
+  protected static MiniTezClusterWithTimeline mrrTezCluster = null;
+  protected static MiniDFSCluster dfsCluster = null;
+  private static String timelineAddress;
+  private Random random = new Random();
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+          .build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (mrrTezCluster == null) {
+      try {
+        mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(),
+            1, 1, 1, true);
+        Configuration conf = new Configuration();
+        conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+        conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+        conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
+        mrrTezCluster.init(conf);
+        mrrTezCluster.start();
+      } catch (Throwable e) {
+        LOG.info("Failed to start Mini Tez Cluster", e);
+      }
+    }
+    timelineAddress = mrrTezCluster.getConfig().get(
+        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
+    if (timelineAddress != null) {
+      // Hack to handle bug in MiniYARNCluster handling of webapp address
+      timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
+    LOG.info("Shutdown invoked");
+    Thread.sleep(10000);
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  // To be replaced after Timeline has java APIs for domains
+  private <K> K getTimelineData(String url, Class<K> clazz) {
+    Client client = new Client();
+    WebResource resource = client.resource(url);
+
+    ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+
+    K entity = response.getEntity(clazz);
+    Assert.assertNotNull(entity);
+    return entity;
+  }
+
+  @Test (timeout=50000)
+  public void testSimpleAMACls() throws Exception {
+    TezClient tezSession = null;
+    ApplicationId applicationId;
+    try {
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+      DAG dag = DAG.create("TezSleepProcessor");
+      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+          Resource.newInstance(256, 1));
+      dag.addVertex(vertex);
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+      tezSession.start();
+
+      applicationId = tezSession.getAppMasterApplicationId();
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+
+//    verifyEntityExistence(applicationId);
+  }
+
+  @Test (timeout=50000)
+  public void testDAGACls() throws Exception {
+    TezClient tezSession = null;
+    ApplicationId applicationId;
+    try {
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+      DAG dag = DAG.create("TezSleepProcessor");
+      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+          Resource.newInstance(256, 1));
+      dag.addVertex(vertex);
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+      tezSession.start();
+
+      applicationId = tezSession.getAppMasterApplicationId();
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+//    verifyEntityExistence(applicationId);
+  }
+
+  private void verifyEntityExistence(ApplicationId applicationId) {
+    Assert.assertNotNull(timelineAddress);
+
+    String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
+        + "tez_" + applicationId.toString()  + "?fields=otherinfo";
+    LOG.info("Getting timeline entity for tez application: " + appUrl);
+    TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
+    Assert.assertNotNull(appEntity);
+
+    TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
+    String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
+        + tezDAGID.toString() + "?fields=otherinfo";
+    LOG.info("Getting timeline entity for tez dag: " + dagUrl);
+    TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
+    Assert.assertNotNull(dagEntity);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
new file mode 100644
index 0000000..e1eb3a4
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventTimelineConversion {
+
+  private ApplicationAttemptId applicationAttemptId;
+  private ApplicationId applicationId;
+  private String user = "user";
+  private Random random = new Random();
+  private TezDAGID tezDAGID;
+  private TezVertexID tezVertexID;
+  private TezTaskID tezTaskID;
+  private TezTaskAttemptID tezTaskAttemptID;
+  private DAGPlan dagPlan;
+  private ContainerId containerId;
+  private NodeId nodeId;
+
+  @Before
+  public void setup() {
+    applicationId = ApplicationId.newInstance(9999l, 1);
+    applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+    tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+    tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+    tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    containerId = ContainerId.newInstance(applicationAttemptId, 111);
+    nodeId = NodeId.newInstance("node", 13435);
+  }
+
+  @Test(timeout = 5000)
+  public void testHandlerExists() throws JSONException {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      HistoryEvent event = null;
+      switch (eventType) {
+        case APP_LAUNCHED:
+          event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
+              user, new Configuration(false));
+          break;
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+              user);
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+              null, user, null);
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+              null, null, user, dagPlan.getName(), null);
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), "proc", null);
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1);
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
+              random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+              null, null, null, null);
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+              tezTaskAttemptID, TaskState.FAILED, null, null);
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+              nodeId, null, null, "nodeHttpAddress");
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+              applicationAttemptId);
+          break;
+        case CONTAINER_STOPPED:
+          event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          event = new VertexRecoverableEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          event = new DAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          event = new VertexCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          event = new VertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          event = new VertexGroupCommitFinishedEvent();
+          break;
+        default:
+          Assert.fail("Unhandled event type " + eventType);
+      }
+      if (event == null || !event.isHistoryEvent()) {
+        continue;
+      }
+      HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertAppLaunchedEvent() {
+    long launchTime = random.nextLong();
+    long submitTime = random.nextLong();
+    Configuration conf = new Configuration(false);
+    conf.set("foo", "bar");
+    conf.set("applicationId", "1234");
+
+
+    AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
+        submitTime, user, conf);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType());
+    Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertEquals(1, timelineEntity.getOtherInfo().size());
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG));
+
+    Map<String, String> config =
+        (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
+    Assert.assertEquals(conf.get("foo"), config.get("foo"));
+    Assert.assertEquals(conf.get("applicationId"), config.get("applicationId"));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertContainerLaunchedEvent() {
+    long launchTime = random.nextLong();
+    ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
+        applicationAttemptId);
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+    Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains(
+        containerId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
+            "tez_" + applicationAttemptId.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+        applicationAttemptId.getApplicationId().toString()));
+
+    Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    Assert.assertEquals(HistoryEventType.CONTAINER_LAUNCHED.name(),
+        timelineEntity.getEvents().get(0).getEventType());
+    Assert.assertEquals(launchTime,
+        timelineEntity.getEvents().get(0).getTimestamp());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGSubmittedEvent() {
+    long submitTime = random.nextLong();
+
+    DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
+        applicationAttemptId, null, user, null);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(5, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION.name()).contains(
+            "tez_" + applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
+            "tez_" + applicationAttemptId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ATTEMPT_ID).contains(
+            applicationAttemptId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+            applicationAttemptId.getApplicationId().toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(submitTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(
+            dagPlan.getName()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationAttemptId.getApplicationId().toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
+    Assert.assertEquals(applicationId.toString(),
+        timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
+
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGInitializedEvent() {
+    long initTime = random.nextLong();
+
+    Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
+    nameIdMap.put("foo", tezVertexID);
+
+    DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
+        nameIdMap);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(initTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName"));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(
+        ATSConstants.VERTEX_NAME_ID_MAPPING));
+    Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get(
+        ATSConstants.VERTEX_NAME_ID_MAPPING);
+    Assert.assertEquals(1, vIdMap.size());
+    Assert.assertNotNull(vIdMap.containsKey("foo"));
+    Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo"));
+
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertDAGFinishedEvent() {
+    long finishTime = random.nextLong();
+    long startTime = random.nextLong();
+    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+
+    DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
+        "diagnostics", null, user, dagPlan.getName(), taskStats);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+            DAGState.ERROR.name()));
+
+    Assert.assertEquals(startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+    Assert.assertEquals(finishTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+    Assert.assertEquals(finishTime - startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
+    Assert.assertEquals(DAGState.ERROR.name(),
+        timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+    Assert.assertEquals("diagnostics",
+        timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+    Assert.assertEquals(100,
+        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+    Assert.assertEquals(200,
+        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexInitializedEvent() {
+    long initRequestedTime = random.nextLong();
+    long initedTime = random.nextLong();
+    int numTasks = random.nextInt();
+    VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime,
+        initedTime, numTasks, "proc", null);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(initedTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+
+    Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_INITIALIZED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(initedTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals("v1", timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME));
+    Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME));
+
+    Assert.assertEquals(initedTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+    Assert.assertEquals(initRequestedTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue());
+    Assert.assertEquals(initedTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+    Assert.assertEquals(numTasks,
+        ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexFinishedEvent() {
+    long initRequestedTime = random.nextLong();
+    long initedTime = random.nextLong();
+    long startRequestedTime = random.nextLong();
+    long startTime = random.nextLong();
+    long finishTime = random.nextLong();
+    Map<String,Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put("FOO", 100);
+    taskStats.put("BAR", 200);
+    VertexStats vertexStats = new VertexStats();
+
+    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime,
+        initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
+        "diagnostics", null, vertexStats, taskStats);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+    Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+            VertexState.ERROR.name()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(finishTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+    Assert.assertEquals(finishTime - startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+    Assert.assertEquals(VertexState.ERROR.name(),
+        timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+    Assert.assertEquals("diagnostics",
+        timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
+
+    Assert.assertEquals(100,
+        ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+    Assert.assertEquals(200,
+        ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskStartedEvent() {
+    long scheduleTime = random.nextLong();
+    long startTime = random.nextLong();
+    TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+            tezVertexID.toString()));
+
+    Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+            tezVertexID.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.TASK_STARTED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(startTime, timelineEvent.getTimestamp());
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.SCHEDULED_TIME));
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
+
+    Assert.assertEquals(scheduleTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue());
+    Assert.assertEquals(startTime,
+        ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertTaskAttemptStartedEvent() {
+    long startTime = random.nextLong();
+    TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
+        startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
+    Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
+
+    Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
+
+    Assert.assertEquals(3, timelineEntity.getRelatedEntities().size());
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.NODE_ID).contains(nodeId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains(
+            containerId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_TASK_ID.name()).contains(
+            tezTaskID.toString()));
+
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+    TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_STARTED.name(), timelineEvent.getEventType());
+    Assert.assertEquals(startTime, timelineEvent.getTimestamp());
+
+    Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size());
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+            applicationId.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+            tezDAGID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+            tezVertexID.toString()));
+    Assert.assertTrue(
+        timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_TASK_ID.name()).contains(
+            tezTaskID.toString()));
+
+    Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
+    Assert.assertEquals("inProgressURL",
+        timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL));
+    Assert.assertEquals("logsURL",
+        timelineEntity.getOtherInfo().get(ATSConstants.COMPLETED_LOGS_URL));
+    Assert.assertEquals(nodeId.toString(),
+        timelineEntity.getOtherInfo().get(ATSConstants.NODE_ID));
+    Assert.assertEquals(containerId.toString(),
+        timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID));
+    Assert.assertEquals("nodeHttpAddress",
+        timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS));
+  }
+
+  @Test(timeout = 5000)
+  public void testConvertVertexParallelismUpdatedEvent() {
+    TezVertexID vId = tezVertexID;
+    Map<String, EdgeManagerPluginDescriptor> edgeMgrs =
+        new HashMap<String, EdgeManagerPluginDescriptor>();
+    edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text"));
+    VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
+        edgeMgrs, null, 10);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
+    Assert.assertEquals(vId.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+
+    TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType());
+    Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS));
+    Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS));
+    Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS));
+
+    Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
+        evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS);
+    Assert.assertEquals(1, updatedEdgeMgrs.size());
+    Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
+    Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a");
+
+    Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
+
+    Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS));
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
deleted file mode 100644
index 18ec43e..0000000
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.records.TezDAGID;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestATSHistoryLoggingService {
-
-  private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class);
-
-  private ATSHistoryLoggingService atsHistoryLoggingService;
-  private AppContext appContext;
-  private Configuration conf;
-  private int atsInvokeCounter;
-  private int atsEntitiesCounter;
-  private SystemClock clock = new SystemClock();
-
-  @Before
-  public void setup() throws Exception {
-    appContext = mock(AppContext.class);
-    atsHistoryLoggingService = new ATSHistoryLoggingService();
-    atsHistoryLoggingService.setAppContext(appContext);
-    conf = new Configuration(false);
-    conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
-        1000l);
-    conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
-    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-    atsInvokeCounter = 0;
-    atsEntitiesCounter = 0;
-    atsHistoryLoggingService.init(conf);
-    atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
-    atsHistoryLoggingService.start();
-    when(appContext.getClock()).thenReturn(clock);
-    when(appContext.getCurrentDAGID()).thenReturn(null);
-    when(atsHistoryLoggingService.timelineClient.putEntities(
-        Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
-        new Answer<Object>() {
-          @Override
-          public Object answer(InvocationOnMock invocation) throws Throwable {
-            ++atsInvokeCounter;
-            atsEntitiesCounter += invocation.getArguments().length;
-            try {
-              Thread.sleep(500l);
-            } catch (InterruptedException e) {
-              // do nothing
-            }
-            return null;
-          }
-        }
-    );
-  }
-
-  @After
-  public void teardown() {
-    atsHistoryLoggingService.stop();
-    atsHistoryLoggingService = null;
-  }
-
-  @Test(timeout=20000)
-  public void testATSHistoryLoggingServiceShutdown() {
-    TezDAGID tezDAGID = TezDAGID.getInstance(
-        ApplicationId.newInstance(100l, 1), 1);
-    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
-        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
-
-    for (int i = 0; i < 100; ++i) {
-      atsHistoryLoggingService.handle(historyEvent);
-    }
-
-    try {
-      Thread.sleep(2500l);
-    } catch (InterruptedException e) {
-      // Do nothing
-    }
-    atsHistoryLoggingService.stop();
-
-    LOG.info("ATS entitiesSent=" + atsEntitiesCounter
-        + ", timelineInvocations=" + atsInvokeCounter);
-
-    Assert.assertTrue(atsEntitiesCounter >= 4);
-    Assert.assertTrue(atsEntitiesCounter < 20);
-
-  }
-
-  @Test(timeout=20000)
-  public void testATSEventBatching() {
-    TezDAGID tezDAGID = TezDAGID.getInstance(
-        ApplicationId.newInstance(100l, 1), 1);
-    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
-        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
-
-    for (int i = 0; i < 100; ++i) {
-      atsHistoryLoggingService.handle(historyEvent);
-    }
-
-    try {
-      Thread.sleep(1000l);
-    } catch (InterruptedException e) {
-      // Do nothing
-    }
-    LOG.info("ATS entitiesSent=" + atsEntitiesCounter
-        + ", timelineInvocations=" + atsInvokeCounter);
-
-    Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter);
-    Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
deleted file mode 100644
index 9c4f721..0000000
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import java.io.IOException;
-import java.util.Random;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
-import org.apache.tez.tests.MiniTezClusterWithTimeline;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-
-public class TestATSHistoryWithMiniCluster {
-
-  private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class);
-
-  protected static MiniTezClusterWithTimeline mrrTezCluster = null;
-  protected static MiniDFSCluster dfsCluster = null;
-  private static String timelineAddress;
-  private Random random = new Random();
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem remoteFs;
-
-  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
-      + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir";
-
-  @BeforeClass
-  public static void setup() throws IOException {
-    try {
-      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
-      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
-          .build();
-      remoteFs = dfsCluster.getFileSystem();
-    } catch (IOException io) {
-      throw new RuntimeException("problem starting mini dfs cluster", io);
-    }
-
-    if (mrrTezCluster == null) {
-      try {
-        mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(),
-            1, 1, 1, true);
-        Configuration conf = new Configuration();
-        conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-        conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
-        conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
-        mrrTezCluster.init(conf);
-        mrrTezCluster.start();
-      } catch (Throwable e) {
-        LOG.info("Failed to start Mini Tez Cluster", e);
-      }
-    }
-    timelineAddress = mrrTezCluster.getConfig().get(
-        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
-    if (timelineAddress != null) {
-      // Hack to handle bug in MiniYARNCluster handling of webapp address
-      timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
-    }
-  }
-
-  @AfterClass
-  public static void tearDown() throws InterruptedException {
-    LOG.info("Shutdown invoked");
-    Thread.sleep(10000);
-    if (mrrTezCluster != null) {
-      mrrTezCluster.stop();
-      mrrTezCluster = null;
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-  }
-
-  // To be replaced after Timeline has java APIs for domains
-  private <K> K getTimelineData(String url, Class<K> clazz) {
-    Client client = new Client();
-    WebResource resource = client.resource(url);
-
-    ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
-        .get(ClientResponse.class);
-    Assert.assertEquals(200, response.getStatus());
-    Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
-
-    K entity = response.getEntity(clazz);
-    Assert.assertNotNull(entity);
-    return entity;
-  }
-
-  @Test (timeout=50000)
-  public void testSimpleAMACls() throws Exception {
-    TezClient tezSession = null;
-    ApplicationId applicationId;
-    try {
-      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-      DAG dag = DAG.create("TezSleepProcessor");
-      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-          Resource.newInstance(256, 1));
-      dag.addVertex(vertex);
-
-      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-          ATSHistoryLoggingService.class.getName());
-      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-          .nextInt(100000))));
-      remoteFs.mkdirs(remoteStagingDir);
-      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-      tezSession.start();
-
-      applicationId = tezSession.getAppMasterApplicationId();
-
-      DAGClient dagClient = tezSession.submitDAG(dag);
-
-      DAGStatus dagStatus = dagClient.getDAGStatus(null);
-      while (!dagStatus.isCompleted()) {
-        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-            + dagStatus.getState());
-        Thread.sleep(500l);
-        dagStatus = dagClient.getDAGStatus(null);
-      }
-      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-
-//    verifyEntityExistence(applicationId);
-  }
-
-  @Test (timeout=50000)
-  public void testDAGACls() throws Exception {
-    TezClient tezSession = null;
-    ApplicationId applicationId;
-    try {
-      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-      DAG dag = DAG.create("TezSleepProcessor");
-      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
-              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-          Resource.newInstance(256, 1));
-      dag.addVertex(vertex);
-
-      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
-      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-          ATSHistoryLoggingService.class.getName());
-      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-          .nextInt(100000))));
-      remoteFs.mkdirs(remoteStagingDir);
-      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
-      tezSession.start();
-
-      applicationId = tezSession.getAppMasterApplicationId();
-
-      DAGClient dagClient = tezSession.submitDAG(dag);
-
-      DAGStatus dagStatus = dagClient.getDAGStatus(null);
-      while (!dagStatus.isCompleted()) {
-        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-            + dagStatus.getState());
-        Thread.sleep(500l);
-        dagStatus = dagClient.getDAGStatus(null);
-      }
-      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-//    verifyEntityExistence(applicationId);
-  }
-
-  private void verifyEntityExistence(ApplicationId applicationId) {
-    Assert.assertNotNull(timelineAddress);
-
-    String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
-        + "tez_" + applicationId.toString()  + "?fields=otherinfo";
-    LOG.info("Getting timeline entity for tez application: " + appUrl);
-    TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
-    Assert.assertNotNull(appEntity);
-
-    TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
-    String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
-        + tezDAGID.toString() + "?fields=otherinfo";
-    LOG.info("Getting timeline entity for tez dag: " + dagUrl);
-    TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
-    Assert.assertNotNull(dagEntity);
-  }
-
-
-}