You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/01/14 04:05:29 UTC
[2/4] tez git commit: TEZ-1931. Publish tez version info to Timeline.
(hitesh)
TEZ-1931. Publish tez version info to Timeline. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d6d03f9e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d6d03f9e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d6d03f9e
Branch: refs/heads/master
Commit: d6d03f9e85952f05b7d11be4dd5e38befcbbeefe
Parents: 8a491f8
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jan 13 18:28:45 2015 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jan 13 18:28:45 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../ats/TestATSHistoryLoggingService.java | 145 +++++
.../ats/TestATSHistoryWithMiniCluster.java | 242 +++++++
.../ats/TestHistoryEventTimelineConversion.java | 625 +++++++++++++++++++
.../ats/TestATSHistoryLoggingService.java | 145 -----
.../ats/TestATSHistoryWithMiniCluster.java | 242 -------
.../ats/TestHistoryEventTimelineConversion.java | 625 -------------------
7 files changed, 1013 insertions(+), 1012 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25eff15..a4b4a4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,7 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1931. Publish tez version info to Timeline.
TEZ-1938. Build warning duplicate jersey-json definitions
TEZ-1910. Build fails against hadoop-2.2.0.
TEZ-1882. Tez UI build does not work on Windows
http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
new file mode 100644
index 0000000..18ec43e
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestATSHistoryLoggingService {
+
+ private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class);
+
+ private ATSHistoryLoggingService atsHistoryLoggingService;
+ private AppContext appContext;
+ private Configuration conf;
+ private int atsInvokeCounter;
+ private int atsEntitiesCounter;
+ private SystemClock clock = new SystemClock();
+
+ @Before
+ public void setup() throws Exception {
+ appContext = mock(AppContext.class);
+ atsHistoryLoggingService = new ATSHistoryLoggingService();
+ atsHistoryLoggingService.setAppContext(appContext);
+ conf = new Configuration(false);
+ conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+ 1000l);
+ conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
+ conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ atsInvokeCounter = 0;
+ atsEntitiesCounter = 0;
+ atsHistoryLoggingService.init(conf);
+ atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
+ atsHistoryLoggingService.start();
+ when(appContext.getClock()).thenReturn(clock);
+ when(appContext.getCurrentDAGID()).thenReturn(null);
+ when(atsHistoryLoggingService.timelineClient.putEntities(
+ Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ ++atsInvokeCounter;
+ atsEntitiesCounter += invocation.getArguments().length;
+ try {
+ Thread.sleep(500l);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ return null;
+ }
+ }
+ );
+ }
+
+ @After
+ public void teardown() {
+ atsHistoryLoggingService.stop();
+ atsHistoryLoggingService = null;
+ }
+
+ @Test(timeout=20000)
+ public void testATSHistoryLoggingServiceShutdown() {
+ TezDAGID tezDAGID = TezDAGID.getInstance(
+ ApplicationId.newInstance(100l, 1), 1);
+ DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+ new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+
+ for (int i = 0; i < 100; ++i) {
+ atsHistoryLoggingService.handle(historyEvent);
+ }
+
+ try {
+ Thread.sleep(2500l);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ atsHistoryLoggingService.stop();
+
+ LOG.info("ATS entitiesSent=" + atsEntitiesCounter
+ + ", timelineInvocations=" + atsInvokeCounter);
+
+ Assert.assertTrue(atsEntitiesCounter >= 4);
+ Assert.assertTrue(atsEntitiesCounter < 20);
+
+ }
+
+ @Test(timeout=20000)
+ public void testATSEventBatching() {
+ TezDAGID tezDAGID = TezDAGID.getInstance(
+ ApplicationId.newInstance(100l, 1), 1);
+ DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+ new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+
+ for (int i = 0; i < 100; ++i) {
+ atsHistoryLoggingService.handle(historyEvent);
+ }
+
+ try {
+ Thread.sleep(1000l);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ LOG.info("ATS entitiesSent=" + atsEntitiesCounter
+ + ", timelineInvocations=" + atsInvokeCounter);
+
+ Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter);
+ Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
new file mode 100644
index 0000000..9c4f721
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.io.IOException;
+import java.util.Random;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+public class TestATSHistoryWithMiniCluster {
+
+ private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class);
+
+ protected static MiniTezClusterWithTimeline mrrTezCluster = null;
+ protected static MiniDFSCluster dfsCluster = null;
+ private static String timelineAddress;
+ private Random random = new Random();
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (mrrTezCluster == null) {
+ try {
+ mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(),
+ 1, 1, 1, true);
+ Configuration conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ } catch (Throwable e) {
+ LOG.info("Failed to start Mini Tez Cluster", e);
+ }
+ }
+ timelineAddress = mrrTezCluster.getConfig().get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
+ if (timelineAddress != null) {
+ // Hack to handle bug in MiniYARNCluster handling of webapp address
+ timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ LOG.info("Shutdown invoked");
+ Thread.sleep(10000);
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ // To be replaced after Timeline has java APIs for domains
+ private <K> K getTimelineData(String url, Class<K> clazz) {
+ Client client = new Client();
+ WebResource resource = client.resource(url);
+
+ ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+
+ K entity = response.getEntity(clazz);
+ Assert.assertNotNull(entity);
+ return entity;
+ }
+
+ @Test (timeout=50000)
+ public void testSimpleAMACls() throws Exception {
+ TezClient tezSession = null;
+ ApplicationId applicationId;
+ try {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = DAG.create("TezSleepProcessor");
+ Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(256, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+ tezSession.start();
+
+ applicationId = tezSession.getAppMasterApplicationId();
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+
+// verifyEntityExistence(applicationId);
+ }
+
+ @Test (timeout=50000)
+ public void testDAGACls() throws Exception {
+ TezClient tezSession = null;
+ ApplicationId applicationId;
+ try {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = DAG.create("TezSleepProcessor");
+ Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(256, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+ tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ ATSHistoryLoggingService.class.getName());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+ tezSession.start();
+
+ applicationId = tezSession.getAppMasterApplicationId();
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ } finally {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+// verifyEntityExistence(applicationId);
+ }
+
+ private void verifyEntityExistence(ApplicationId applicationId) {
+ Assert.assertNotNull(timelineAddress);
+
+ String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
+ + "tez_" + applicationId.toString() + "?fields=otherinfo";
+ LOG.info("Getting timeline entity for tez application: " + appUrl);
+ TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
+ Assert.assertNotNull(appEntity);
+
+ TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
+ String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
+ + tezDAGID.toString() + "?fields=otherinfo";
+ LOG.info("Getting timeline entity for tez dag: " + dagUrl);
+ TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
+ Assert.assertNotNull(dagEntity);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
new file mode 100644
index 0000000..e1eb3a4
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventTimelineConversion {
+
+ private ApplicationAttemptId applicationAttemptId;
+ private ApplicationId applicationId;
+ private String user = "user";
+ private Random random = new Random();
+ private TezDAGID tezDAGID;
+ private TezVertexID tezVertexID;
+ private TezTaskID tezTaskID;
+ private TezTaskAttemptID tezTaskAttemptID;
+ private DAGPlan dagPlan;
+ private ContainerId containerId;
+ private NodeId nodeId;
+
+ @Before
+ public void setup() {
+ applicationId = ApplicationId.newInstance(9999l, 1);
+ applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+ tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+ tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+ tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+ tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+ dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+ containerId = ContainerId.newInstance(applicationAttemptId, 111);
+ nodeId = NodeId.newInstance("node", 13435);
+ }
+
+ @Test(timeout = 5000)
+ public void testHandlerExists() throws JSONException {
+ for (HistoryEventType eventType : HistoryEventType.values()) {
+ HistoryEvent event = null;
+ switch (eventType) {
+ case APP_LAUNCHED:
+ event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
+ user, new Configuration(false));
+ break;
+ case AM_LAUNCHED:
+ event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+ user);
+ break;
+ case AM_STARTED:
+ event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+ break;
+ case DAG_SUBMITTED:
+ event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+ null, user, null);
+ break;
+ case DAG_INITIALIZED:
+ event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
+ break;
+ case DAG_STARTED:
+ event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ break;
+ case DAG_FINISHED:
+ event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+ null, null, user, dagPlan.getName(), null);
+ break;
+ case VERTEX_INITIALIZED:
+ event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+ random.nextInt(), "proc", null);
+ break;
+ case VERTEX_STARTED:
+ event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+ break;
+ case VERTEX_PARALLELISM_UPDATED:
+ event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1);
+ break;
+ case VERTEX_FINISHED:
+ event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
+ random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+ null, null, null, null);
+ break;
+ case TASK_STARTED:
+ event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+ break;
+ case TASK_FINISHED:
+ event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+ tezTaskAttemptID, TaskState.FAILED, null, null);
+ break;
+ case TASK_ATTEMPT_STARTED:
+ event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+ nodeId, null, null, "nodeHttpAddress");
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+ random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
+ break;
+ case CONTAINER_LAUNCHED:
+ event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+ applicationAttemptId);
+ break;
+ case CONTAINER_STOPPED:
+ event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ event = new VertexRecoverableEventsGeneratedEvent();
+ break;
+ case DAG_COMMIT_STARTED:
+ event = new DAGCommitStartedEvent();
+ break;
+ case VERTEX_COMMIT_STARTED:
+ event = new VertexCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ event = new VertexGroupCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ event = new VertexGroupCommitFinishedEvent();
+ break;
+ default:
+ Assert.fail("Unhandled event type " + eventType);
+ }
+ if (event == null || !event.isHistoryEvent()) {
+ continue;
+ }
+ HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertAppLaunchedEvent() {
+ long launchTime = random.nextLong();
+ long submitTime = random.nextLong();
+ Configuration conf = new Configuration(false);
+ conf.set("foo", "bar");
+ conf.set("applicationId", "1234");
+
+
+ AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
+ submitTime, user, conf);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+ Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
+
+ Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType());
+ Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+ applicationId.toString()));
+
+ Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+ Assert.assertEquals(1, timelineEntity.getOtherInfo().size());
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG));
+
+ Map<String, String> config =
+ (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
+ Assert.assertEquals(conf.get("foo"), config.get("foo"));
+ Assert.assertEquals(conf.get("applicationId"), config.get("applicationId"));
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertContainerLaunchedEvent() {
+ long launchTime = random.nextLong();
+ ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
+ applicationAttemptId);
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+ Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains(
+ containerId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
+ "tez_" + applicationAttemptId.toString()));
+
+ Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+ applicationAttemptId.getApplicationId().toString()));
+
+ Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ Assert.assertEquals(HistoryEventType.CONTAINER_LAUNCHED.name(),
+ timelineEntity.getEvents().get(0).getEventType());
+ Assert.assertEquals(launchTime,
+ timelineEntity.getEvents().get(0).getTimestamp());
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertDAGSubmittedEvent() {
+ long submitTime = random.nextLong();
+
+ DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
+ applicationAttemptId, null, user, null);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(5, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION.name()).contains(
+ "tez_" + applicationId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_APPLICATION_ATTEMPT.name()).contains(
+ "tez_" + applicationAttemptId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ATTEMPT_ID).contains(
+ applicationAttemptId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+ applicationAttemptId.getApplicationId().toString()));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(submitTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
+
+ Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(
+ dagPlan.getName()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+ applicationAttemptId.getApplicationId().toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
+ Assert.assertEquals(applicationId.toString(),
+ timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
+
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertDAGInitializedEvent() {
+ long initTime = random.nextLong();
+
+ Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
+ nameIdMap.put("foo", tezVertexID);
+
+ DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
+ nameIdMap);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(initTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName"));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(
+ ATSConstants.VERTEX_NAME_ID_MAPPING));
+ Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get(
+ ATSConstants.VERTEX_NAME_ID_MAPPING);
+ Assert.assertEquals(1, vIdMap.size());
+ Assert.assertNotNull(vIdMap.containsKey("foo"));
+ Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo"));
+
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertDAGFinishedEvent() {
+ long finishTime = random.nextLong();
+ long startTime = random.nextLong();
+ Map<String,Integer> taskStats = new HashMap<String, Integer>();
+ taskStats.put("FOO", 100);
+ taskStats.put("BAR", 200);
+
+ DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
+ "diagnostics", null, user, dagPlan.getName(), taskStats);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+ DAGState.ERROR.name()));
+
+ Assert.assertEquals(startTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+ Assert.assertEquals(finishTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+ Assert.assertEquals(finishTime - startTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
+ Assert.assertEquals(DAGState.ERROR.name(),
+ timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+ Assert.assertEquals("diagnostics",
+ timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+ Assert.assertEquals(100,
+ ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+ Assert.assertEquals(200,
+ ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertVertexInitializedEvent() {
+ long initRequestedTime = random.nextLong();
+ long initedTime = random.nextLong();
+ int numTasks = random.nextInt();
+ VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime,
+ initedTime, numTasks, "proc", null);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(initedTime, timelineEntity.getStartTime().longValue());
+
+ Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+ tezDAGID.toString()));
+
+ Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+ applicationId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+ tezDAGID.toString()));
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.VERTEX_INITIALIZED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(initedTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals("v1", timelineEntity.getOtherInfo().get(ATSConstants.VERTEX_NAME));
+ Assert.assertEquals("proc", timelineEntity.getOtherInfo().get(ATSConstants.PROCESSOR_CLASS_NAME));
+
+ Assert.assertEquals(initedTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+ Assert.assertEquals(initRequestedTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_REQUESTED_TIME)).longValue());
+ Assert.assertEquals(initedTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
+ Assert.assertEquals(numTasks,
+ ((Integer)timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertVertexFinishedEvent() {
+ long initRequestedTime = random.nextLong();
+ long initedTime = random.nextLong();
+ long startRequestedTime = random.nextLong();
+ long startTime = random.nextLong();
+ long finishTime = random.nextLong();
+ Map<String,Integer> taskStats = new HashMap<String, Integer>();
+ taskStats.put("FOO", 100);
+ taskStats.put("BAR", 200);
+ VertexStats vertexStats = new VertexStats();
+
+ VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime,
+ initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
+ "diagnostics", null, vertexStats, taskStats);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+ Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+ tezDAGID.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+ VertexState.ERROR.name()));
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(finishTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+ Assert.assertEquals(finishTime - startTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+ Assert.assertEquals(VertexState.ERROR.name(),
+ timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+ Assert.assertEquals("diagnostics",
+ timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
+
+ Assert.assertEquals(100,
+ ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+ Assert.assertEquals(200,
+ ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertTaskStartedEvent() {
+ long scheduleTime = random.nextLong();
+ long startTime = random.nextLong();
+ TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
+
+ Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+ tezVertexID.toString()));
+
+ Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+ applicationId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+ tezDAGID.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+ tezVertexID.toString()));
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.TASK_STARTED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(startTime, timelineEvent.getTimestamp());
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.SCHEDULED_TIME));
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
+
+ Assert.assertEquals(scheduleTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME)).longValue());
+ Assert.assertEquals(startTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertTaskAttemptStartedEvent() {
+ long startTime = random.nextLong();
+ TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
+ startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(startTime, timelineEntity.getStartTime().longValue());
+
+ Assert.assertEquals(3, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.NODE_ID).contains(nodeId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.CONTAINER_ID).contains(
+ containerId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(EntityTypes.TEZ_TASK_ID.name()).contains(
+ tezTaskID.toString()));
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.TASK_ATTEMPT_STARTED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(startTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(4, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains(
+ applicationId.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+ tezDAGID.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_VERTEX_ID.name()).contains(
+ tezVertexID.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_TASK_ID.name()).contains(
+ tezTaskID.toString()));
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.START_TIME));
+ Assert.assertEquals("inProgressURL",
+ timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL));
+ Assert.assertEquals("logsURL",
+ timelineEntity.getOtherInfo().get(ATSConstants.COMPLETED_LOGS_URL));
+ Assert.assertEquals(nodeId.toString(),
+ timelineEntity.getOtherInfo().get(ATSConstants.NODE_ID));
+ Assert.assertEquals(containerId.toString(),
+ timelineEntity.getOtherInfo().get(ATSConstants.CONTAINER_ID));
+ Assert.assertEquals("nodeHttpAddress",
+ timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS));
+ }
+
+ @Test(timeout = 5000)
+ public void testConvertVertexParallelismUpdatedEvent() {
+ TezVertexID vId = tezVertexID;
+ Map<String, EdgeManagerPluginDescriptor> edgeMgrs =
+ new HashMap<String, EdgeManagerPluginDescriptor>();
+ edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text"));
+ VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
+ edgeMgrs, null, 10);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
+ Assert.assertEquals(vId.toString(), timelineEntity.getEntityId());
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+
+ TimelineEvent evt = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType());
+ Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS));
+ Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS));
+ Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS));
+
+ Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
+ evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS);
+ Assert.assertEquals(1, updatedEdgeMgrs.size());
+ Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
+ Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a");
+
+ Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
+
+ Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS));
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
deleted file mode 100644
index 18ec43e..0000000
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.records.TezDAGID;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestATSHistoryLoggingService {
-
- private static final Log LOG = LogFactory.getLog(TestATSHistoryLoggingService.class);
-
- private ATSHistoryLoggingService atsHistoryLoggingService;
- private AppContext appContext;
- private Configuration conf;
- private int atsInvokeCounter;
- private int atsEntitiesCounter;
- private SystemClock clock = new SystemClock();
-
- @Before
- public void setup() throws Exception {
- appContext = mock(AppContext.class);
- atsHistoryLoggingService = new ATSHistoryLoggingService();
- atsHistoryLoggingService.setAppContext(appContext);
- conf = new Configuration(false);
- conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
- 1000l);
- conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
- conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
- atsInvokeCounter = 0;
- atsEntitiesCounter = 0;
- atsHistoryLoggingService.init(conf);
- atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
- atsHistoryLoggingService.start();
- when(appContext.getClock()).thenReturn(clock);
- when(appContext.getCurrentDAGID()).thenReturn(null);
- when(atsHistoryLoggingService.timelineClient.putEntities(
- Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
- new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- ++atsInvokeCounter;
- atsEntitiesCounter += invocation.getArguments().length;
- try {
- Thread.sleep(500l);
- } catch (InterruptedException e) {
- // do nothing
- }
- return null;
- }
- }
- );
- }
-
- @After
- public void teardown() {
- atsHistoryLoggingService.stop();
- atsHistoryLoggingService = null;
- }
-
- @Test(timeout=20000)
- public void testATSHistoryLoggingServiceShutdown() {
- TezDAGID tezDAGID = TezDAGID.getInstance(
- ApplicationId.newInstance(100l, 1), 1);
- DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
- new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
-
- for (int i = 0; i < 100; ++i) {
- atsHistoryLoggingService.handle(historyEvent);
- }
-
- try {
- Thread.sleep(2500l);
- } catch (InterruptedException e) {
- // Do nothing
- }
- atsHistoryLoggingService.stop();
-
- LOG.info("ATS entitiesSent=" + atsEntitiesCounter
- + ", timelineInvocations=" + atsInvokeCounter);
-
- Assert.assertTrue(atsEntitiesCounter >= 4);
- Assert.assertTrue(atsEntitiesCounter < 20);
-
- }
-
- @Test(timeout=20000)
- public void testATSEventBatching() {
- TezDAGID tezDAGID = TezDAGID.getInstance(
- ApplicationId.newInstance(100l, 1), 1);
- DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
- new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
-
- for (int i = 0; i < 100; ++i) {
- atsHistoryLoggingService.handle(historyEvent);
- }
-
- try {
- Thread.sleep(1000l);
- } catch (InterruptedException e) {
- // Do nothing
- }
- LOG.info("ATS entitiesSent=" + atsEntitiesCounter
- + ", timelineInvocations=" + atsInvokeCounter);
-
- Assert.assertTrue(atsEntitiesCounter > atsInvokeCounter);
- Assert.assertEquals(atsEntitiesCounter/2, atsInvokeCounter);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/d6d03f9e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
deleted file mode 100644
index 9c4f721..0000000
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.ats;
-
-import java.io.IOException;
-import java.util.Random;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
-import org.apache.tez.tests.MiniTezClusterWithTimeline;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-
-public class TestATSHistoryWithMiniCluster {
-
- private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class);
-
- protected static MiniTezClusterWithTimeline mrrTezCluster = null;
- protected static MiniDFSCluster dfsCluster = null;
- private static String timelineAddress;
- private Random random = new Random();
-
- private static Configuration conf = new Configuration();
- private static FileSystem remoteFs;
-
- private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
- + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir";
-
- @BeforeClass
- public static void setup() throws IOException {
- try {
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
- dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
- .build();
- remoteFs = dfsCluster.getFileSystem();
- } catch (IOException io) {
- throw new RuntimeException("problem starting mini dfs cluster", io);
- }
-
- if (mrrTezCluster == null) {
- try {
- mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(),
- 1, 1, 1, true);
- Configuration conf = new Configuration();
- conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
- conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
- mrrTezCluster.init(conf);
- mrrTezCluster.start();
- } catch (Throwable e) {
- LOG.info("Failed to start Mini Tez Cluster", e);
- }
- }
- timelineAddress = mrrTezCluster.getConfig().get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
- if (timelineAddress != null) {
- // Hack to handle bug in MiniYARNCluster handling of webapp address
- timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
- }
- }
-
- @AfterClass
- public static void tearDown() throws InterruptedException {
- LOG.info("Shutdown invoked");
- Thread.sleep(10000);
- if (mrrTezCluster != null) {
- mrrTezCluster.stop();
- mrrTezCluster = null;
- }
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
- }
-
- // To be replaced after Timeline has java APIs for domains
- private <K> K getTimelineData(String url, Class<K> clazz) {
- Client client = new Client();
- WebResource resource = client.resource(url);
-
- ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
- .get(ClientResponse.class);
- Assert.assertEquals(200, response.getStatus());
- Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
-
- K entity = response.getEntity(clazz);
- Assert.assertNotNull(entity);
- return entity;
- }
-
- @Test (timeout=50000)
- public void testSimpleAMACls() throws Exception {
- TezClient tezSession = null;
- ApplicationId applicationId;
- try {
- SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
- DAG dag = DAG.create("TezSleepProcessor");
- Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
- SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
- Resource.newInstance(256, 1));
- dag.addVertex(vertex);
-
- TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
- tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
- tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
- ATSHistoryLoggingService.class.getName());
- Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
- .nextInt(100000))));
- remoteFs.mkdirs(remoteStagingDir);
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
- tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
- tezSession.start();
-
- applicationId = tezSession.getAppMasterApplicationId();
-
- DAGClient dagClient = tezSession.submitDAG(dag);
-
- DAGStatus dagStatus = dagClient.getDAGStatus(null);
- while (!dagStatus.isCompleted()) {
- LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
- + dagStatus.getState());
- Thread.sleep(500l);
- dagStatus = dagClient.getDAGStatus(null);
- }
- Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
- } finally {
- if (tezSession != null) {
- tezSession.stop();
- }
- }
-
-// verifyEntityExistence(applicationId);
- }
-
- @Test (timeout=50000)
- public void testDAGACls() throws Exception {
- TezClient tezSession = null;
- ApplicationId applicationId;
- try {
- SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
- DAG dag = DAG.create("TezSleepProcessor");
- Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
- SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
- Resource.newInstance(256, 1));
- dag.addVertex(vertex);
-
- TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
- tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
- tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
- ATSHistoryLoggingService.class.getName());
- Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
- .nextInt(100000))));
- remoteFs.mkdirs(remoteStagingDir);
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
- tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
- tezSession.start();
-
- applicationId = tezSession.getAppMasterApplicationId();
-
- DAGClient dagClient = tezSession.submitDAG(dag);
-
- DAGStatus dagStatus = dagClient.getDAGStatus(null);
- while (!dagStatus.isCompleted()) {
- LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
- + dagStatus.getState());
- Thread.sleep(500l);
- dagStatus = dagClient.getDAGStatus(null);
- }
- Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
- } finally {
- if (tezSession != null) {
- tezSession.stop();
- }
- }
-// verifyEntityExistence(applicationId);
- }
-
- private void verifyEntityExistence(ApplicationId applicationId) {
- Assert.assertNotNull(timelineAddress);
-
- String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
- + "tez_" + applicationId.toString() + "?fields=otherinfo";
- LOG.info("Getting timeline entity for tez application: " + appUrl);
- TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
- Assert.assertNotNull(appEntity);
-
- TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
- String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
- + tezDAGID.toString() + "?fields=otherinfo";
- LOG.info("Getting timeline entity for tez dag: " + dagUrl);
- TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
- Assert.assertNotNull(dagEntity);
- }
-
-
-}