You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ha...@apache.org on 2017/04/20 10:12:38 UTC
tez git commit: TEZ-3611. Create lightweight summary events for ATS.
(harishjp)
Repository: tez
Updated Branches:
refs/heads/master 59f56a540 -> e9d0b1b26
TEZ-3611. Create lightweight summary events for ATS. (harishjp)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e9d0b1b2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e9d0b1b2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e9d0b1b2
Branch: refs/heads/master
Commit: e9d0b1b266f93a15029159b2da94c5d438d7408b
Parents: 59f56a5
Author: Harish JP <ha...@gmail.com>
Authored: Thu Apr 20 15:42:16 2017 +0530
Committer: Harish JP <ha...@gmail.com>
Committed: Thu Apr 20 15:42:16 2017 +0530
----------------------------------------------------------------------
.../org/apache/tez/common/ATSConstants.java | 1 +
.../tez/dag/history/logging/EntityTypes.java | 1 +
.../org/apache/tez/history/ATSImportTool.java | 19 ++-
.../logging/ats/TimelineCachePluginImpl.java | 4 +-
.../ats/TestTimelineCachePluginImpl.java | 2 +
.../ats/ATSV15HistoryLoggingService.java | 11 +-
.../ats/TestATSV15HistoryLoggingService.java | 10 +-
.../logging/ats/ATSHistoryLoggingService.java | 8 +-
.../ats/HistoryEventTimelineConversion.java | 150 ++++++++++-------
.../ats/TestATSHistoryLoggingService.java | 8 +-
.../ats/TestHistoryEventTimelineConversion.java | 168 ++++++++++++++++---
11 files changed, 282 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 25c802e..6e07849 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -47,6 +47,7 @@ public class ATSConstants {
public static final String USER = "user";
public static final String CALLER_CONTEXT_ID = "callerId";
public static final String CALLER_CONTEXT_TYPE = "callerType";
+ public static final String CALLER_CONTEXT = "callerContext";
/* Keys used in other info */
public static final String APP_SUBMIT_TIME = "appSubmitTime";
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
index e2f0882..6f6205d 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -23,6 +23,7 @@ public enum EntityTypes {
TEZ_APPLICATION_ATTEMPT,
TEZ_CONTAINER_ID,
TEZ_DAG_ID,
+ TEZ_DAG_EXTRA_INFO,
TEZ_VERTEX_ID,
TEZ_TASK_ID,
TEZ_TASK_ATTEMPT_ID,
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
index 3efeb5a..fee226a 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
@@ -35,7 +35,6 @@ import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -50,6 +49,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.history.parser.datamodel.Constants;
import org.apache.tez.history.parser.utils.Utils;
@@ -68,6 +68,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLEncoder;
+import java.util.Iterator;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -191,6 +192,22 @@ public class ATSImportTool extends Configured implements Tool {
//Download dag
String dagUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_DAG_ID, dagId);
JSONObject dagRoot = getJsonRootEntity(dagUrl);
+
+ // We have added dag extra info, if we find any from ATS we copy the info into dag object
+ // extra info.
+ String dagExtraInfoUrl = String.format("%s/%s/%s", baseUri, EntityTypes.TEZ_DAG_EXTRA_INFO,
+ dagId);
+ JSONObject dagExtraInfo = getJsonRootEntity(dagExtraInfoUrl);
+ if (dagExtraInfo.has(Constants.OTHER_INFO)) {
+ JSONObject dagOtherInfo = dagRoot.getJSONObject(Constants.OTHER_INFO);
+ JSONObject extraOtherInfo = dagExtraInfo.getJSONObject(Constants.OTHER_INFO);
+ @SuppressWarnings("unchecked")
+ Iterator<String> iter = extraOtherInfo.keys();
+ while (iter.hasNext()) {
+ String key = iter.next();
+ dagOtherInfo.put(key, extraOtherInfo.get(key));
+ }
+ }
finalJson.put(Constants.DAG, dagRoot);
//Create a zip entry with dagId as its name.
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
index 8269714..d211feb 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/main/java/org/apache/tez/dag/history/logging/ats/TimelineCachePluginImpl.java
@@ -50,6 +50,7 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implement
static {
knownEntityTypes = Sets.newHashSet(
EntityTypes.TEZ_DAG_ID.name(),
+ EntityTypes.TEZ_DAG_EXTRA_INFO.name(),
EntityTypes.TEZ_VERTEX_ID.name(),
EntityTypes.TEZ_TASK_ID.name(),
EntityTypes.TEZ_TASK_ATTEMPT_ID.name(),
@@ -84,7 +85,8 @@ public class TimelineCachePluginImpl extends TimelineEntityGroupPlugin implement
|| entityId == null || entityId.isEmpty()) {
return null;
}
- if (entityType.equals(EntityTypes.TEZ_DAG_ID.name())) {
+ if (entityType.equals(EntityTypes.TEZ_DAG_ID.name()) ||
+ entityType.equals(EntityTypes.TEZ_DAG_EXTRA_INFO.name())) {
TezDAGID dagId = TezDAGID.fromString(entityId);
if (dagId != null) {
return createTimelineEntityGroupIds(dagId);
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
index 3d1af63..1bfa0a1 100644
--- a/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
+++ b/tez-plugins/tez-yarn-timeline-cache-plugin/src/test/java/org/apache/tez/dag/history/logging/ats/TestTimelineCachePluginImpl.java
@@ -101,12 +101,14 @@ public class TestTimelineCachePluginImpl {
typeIdMap1 = new HashMap<String, String>();
typeIdMap1.put(EntityTypes.TEZ_DAG_ID.name(), dagID1.toString());
+ typeIdMap1.put(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), dagID1.toString());
typeIdMap1.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID1.toString());
typeIdMap1.put(EntityTypes.TEZ_TASK_ID.name(), taskID1.toString());
typeIdMap1.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID1.toString());
typeIdMap2 = new HashMap<String, String>();
typeIdMap2.put(EntityTypes.TEZ_DAG_ID.name(), dagID2.toString());
+ typeIdMap2.put(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), dagID2.toString());
typeIdMap2.put(EntityTypes.TEZ_VERTEX_ID.name(), vertexID2.toString());
typeIdMap2.put(EntityTypes.TEZ_TASK_ID.name(), taskID2.toString());
typeIdMap2.put(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), attemptID2.toString());
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index a095cbc..a71f0d8 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.logging.ats;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -375,16 +376,20 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
return;
}
-
- TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+ TimelineEntityGroupId groupId = getGroupId(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(
event.getHistoryEvent());
+ for (TimelineEntity entity : entities) {
+ logEntity(groupId, entity, domainId);
+ }
+ }
+ private void logEntity(TimelineEntityGroupId groupId, TimelineEntity entity, String domainId) {
if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
}
try {
- TimelineEntityGroupId groupId = getGroupId(event);
TimelinePutResponse response = timelineClient.putEntities(
appContext.getApplicationAttemptId(), groupId, entity);
if (response != null
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index ef5da81..96c3c80 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -111,7 +111,7 @@ public class TestATSV15HistoryLoggingService {
List<TimelineEntity> nonGroupedDagEvents = entityLog.get(
TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
assertNotNull(nonGroupedDagEvents);
- assertEquals(4, nonGroupedDagEvents.size());
+ assertEquals(5, nonGroupedDagEvents.size());
service.stop();
}
@@ -139,7 +139,7 @@ public class TestATSV15HistoryLoggingService {
List<TimelineEntity> nonGroupedDagEvents = entityLog.get(
TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
assertNotNull(nonGroupedDagEvents);
- assertEquals(4, nonGroupedDagEvents.size());
+ assertEquals(5, nonGroupedDagEvents.size());
service.stop();
}
@@ -185,7 +185,7 @@ public class TestATSV15HistoryLoggingService {
List<TimelineEntity> groupedDagEvents = entityLog.get(
TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup)));
assertNotNull(groupedDagEvents);
- assertEquals(8, groupedDagEvents.size());
+ assertEquals(10, groupedDagEvents.size());
nonGroupedDagEvents = entityLog.get(
TimelineEntityGroupId.newInstance(appId, dagId3.toString()));
@@ -194,7 +194,7 @@ public class TestATSV15HistoryLoggingService {
groupedDagEvents = entityLog.get(
TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup)));
assertNotNull(groupedDagEvents);
- assertEquals(4, groupedDagEvents.size());
+ assertEquals(5, groupedDagEvents.size());
service.stop();
}
@@ -338,7 +338,7 @@ public class TestATSV15HistoryLoggingService {
// calls were made with correct domain ids.
verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("session-id"));
- verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-id"));
+ verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("dag-id"));
service.stop();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index dc215fd..6d035cc 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -336,11 +336,13 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) {
continue;
}
- TimelineEntity entity = HistoryEventTimelineConversion.convertToTimelineEntity(
+ List<TimelineEntity> eventEntities = HistoryEventTimelineConversion.convertToTimelineEntities(
event.getHistoryEvent());
- entities.add(entity);
+ entities.addAll(eventEntities);
if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) {
- historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
+ for (TimelineEntity entity: eventEntities) {
+ historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index faccc98..235a292 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -19,7 +19,9 @@
package org.apache.tez.dag.history.logging.ats;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -31,6 +33,7 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
import org.apache.tez.dag.app.web.AMWebController;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -56,82 +59,82 @@ import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezVertexID;
+import com.google.common.collect.Lists;
+
public class HistoryEventTimelineConversion {
- public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
- if (!historyEvent.isHistoryEvent()) {
+ private static void validateEvent(HistoryEvent event) {
+ if (!event.isHistoryEvent()) {
throw new UnsupportedOperationException("Invalid Event, does not support history"
- + ", eventType=" + historyEvent.getEventType());
+ + ", eventType=" + event.getEventType());
}
- TimelineEntity timelineEntity;
+ }
+
+ public static List<TimelineEntity> convertToTimelineEntities(HistoryEvent historyEvent) {
+ validateEvent(historyEvent);
switch (historyEvent.getEventType()) {
case APP_LAUNCHED:
- timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
- break;
+ return Collections.singletonList(convertAppLaunchedEvent((AppLaunchedEvent) historyEvent));
case AM_LAUNCHED:
- timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
- break;
+ return Collections.singletonList(convertAMLaunchedEvent((AMLaunchedEvent) historyEvent));
case AM_STARTED:
- timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
- break;
+ return Collections.singletonList(convertAMStartedEvent((AMStartedEvent) historyEvent));
case CONTAINER_LAUNCHED:
- timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent));
case CONTAINER_STOPPED:
- timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent));
case DAG_SUBMITTED:
- timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
- break;
+ return Lists.newArrayList(
+ convertDAGSubmittedToDAGExtraInfoEntity((DAGSubmittedEvent)historyEvent),
+ convertDAGSubmittedEvent((DAGSubmittedEvent)historyEvent));
case DAG_INITIALIZED:
- timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertDAGInitializedEvent((DAGInitializedEvent) historyEvent));
case DAG_STARTED:
- timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
- break;
+ return Collections.singletonList(convertDAGStartedEvent((DAGStartedEvent) historyEvent));
case DAG_FINISHED:
- timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
- break;
+ return Lists.newArrayList(
+ convertDAGFinishedToDAGExtraInfoEntity((DAGFinishedEvent) historyEvent),
+ convertDAGFinishedEvent((DAGFinishedEvent) historyEvent));
case VERTEX_INITIALIZED:
- timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertVertexInitializedEvent((VertexInitializedEvent) historyEvent));
case VERTEX_STARTED:
- timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertVertexStartedEvent((VertexStartedEvent) historyEvent));
case VERTEX_FINISHED:
- timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertVertexFinishedEvent((VertexFinishedEvent) historyEvent));
case TASK_STARTED:
- timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
- break;
+ return Collections.singletonList(convertTaskStartedEvent((TaskStartedEvent) historyEvent));
case TASK_FINISHED:
- timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertTaskFinishedEvent((TaskFinishedEvent) historyEvent));
case TASK_ATTEMPT_STARTED:
- timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent));
case TASK_ATTEMPT_FINISHED:
- timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent));
case VERTEX_CONFIGURE_DONE:
- timelineEntity = convertVertexReconfigureDoneEvent(
- (VertexConfigurationDoneEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent));
case DAG_RECOVERED:
- timelineEntity = convertDAGRecoveredEvent(
- (DAGRecoveredEvent) historyEvent);
- break;
+ return Collections.singletonList(
+ convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent));
case VERTEX_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_FINISHED:
case DAG_COMMIT_STARTED:
+ case DAG_KILL_REQUEST:
throw new UnsupportedOperationException("Invalid Event, does not support history"
+ ", eventType=" + historyEvent.getEventType());
- default:
- throw new UnsupportedOperationException("Unhandled Event"
- + ", eventType=" + historyEvent.getEventType());
+ // Do not add default, if a new event type is added, we'll get a warning for the switch.
}
- return timelineEntity;
+ throw new UnsupportedOperationException("Unhandled Event, eventType=" +
+ historyEvent.getEventType());
}
private static TimelineEntity convertDAGRecoveredEvent(DAGRecoveredEvent event) {
@@ -309,8 +312,6 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
- atsEntity.addOtherInfo(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToATSMap(event.getTezCounters()));
atsEntity.addOtherInfo(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
event.getApplicationAttemptId().toString());
@@ -324,6 +325,24 @@ public class HistoryEventTimelineConversion {
return atsEntity;
}
+ private static TimelineEntity convertDAGFinishedToDAGExtraInfoEntity(DAGFinishedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getDagID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_DAG_EXTRA_INFO.name());
+
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), event.getDagID().toString());
+
+ TimelineEvent submitEvt = new TimelineEvent();
+ submitEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
+ submitEvt.setTimestamp(event.getFinishTime());
+ atsEntity.addEvent(submitEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+ return atsEntity;
+ }
+
+
private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
TimelineEntity atsEntity = new TimelineEntity();
atsEntity.setEntityId(event.getDagID().toString());
@@ -397,19 +416,15 @@ public class HistoryEventTimelineConversion {
if (event.getDAGPlan().hasCallerContext()
&& event.getDAGPlan().getCallerContext().hasCallerId()) {
- atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID,
- event.getDAGPlan().getCallerContext().getCallerId());
+ CallerContextProto callerContext = event.getDagPlan().getCallerContext();
+ atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId());
+ atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId());
+ atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT, callerContext.getContext());
}
if (event.getQueueName() != null) {
atsEntity.addPrimaryFilter(ATSConstants.DAG_QUEUE_NAME, event.getQueueName());
}
- try {
- atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
- DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
event.getApplicationAttemptId().getApplicationId().toString());
atsEntity.addOtherInfo(ATSConstants.APPLICATION_ATTEMPT_ID,
@@ -433,6 +448,29 @@ public class HistoryEventTimelineConversion {
return atsEntity;
}
+ private static TimelineEntity convertDAGSubmittedToDAGExtraInfoEntity(DAGSubmittedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getDagID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_DAG_EXTRA_INFO.name());
+
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(), event.getDagID().toString());
+
+ TimelineEvent submitEvt = new TimelineEvent();
+ submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
+ submitEvt.setTimestamp(event.getSubmitTime());
+ atsEntity.addEvent(submitEvt);
+
+ atsEntity.setStartTime(event.getSubmitTime());
+
+ try {
+ atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+ DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ return atsEntity;
+ }
+
private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
TimelineEntity atsEntity = new TimelineEntity();
atsEntity.setEntityId(event.getTaskAttemptID().toString());
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index a641cda..6603f4f 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -248,7 +248,7 @@ public class TestATSHistoryLoggingService {
.setupSessionDAGACLs((Configuration)any(), eq(appId), eq("0"), (DAGAccessControls)any());
// All calls made with session domain id.
- verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("session-id"));
+ verify(historyACLPolicyManager, times(6)).updateTimelineEntityDomain(any(), eq("session-id"));
}
@Test(timeout=10000)
@@ -299,7 +299,7 @@ public class TestATSHistoryLoggingService {
// All calls made with session domain id.
verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), eq("session-id"));
- Assert.assertEquals(5, atsEntitiesCounter);
+ Assert.assertEquals(6, atsEntitiesCounter);
}
@Test(timeout=10000)
@@ -333,7 +333,7 @@ public class TestATSHistoryLoggingService {
// All calls made with session domain id.
verify(historyACLPolicyManager, times(1)).updateTimelineEntityDomain(any(), eq("test-domain"));
- verify(historyACLPolicyManager, times(4)).updateTimelineEntityDomain(any(), eq("dag-domain"));
+ verify(historyACLPolicyManager, times(5)).updateTimelineEntityDomain(any(), eq("dag-domain"));
}
@Test(timeout=10000)
@@ -433,7 +433,7 @@ public class TestATSHistoryLoggingService {
// All calls made with session domain id.
verify(historyACLPolicyManager, times(0)).updateTimelineEntityDomain(any(), (String)any());
- Assert.assertEquals(5, atsEntitiesCounter);
+ Assert.assertEquals(6, atsEntitiesCounter);
}
private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
http://git-wip-us.apache.org/repos/asf/tez/blob/e9d0b1b2/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 28fd5da..1663cb0 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -221,7 +221,7 @@ public class TestHistoryEventTimelineConversion {
if (event == null || !event.isHistoryEvent()) {
continue;
}
- HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ HistoryEventTimelineConversion.convertToTimelineEntities(event);
}
}
@@ -259,7 +259,7 @@ public class TestHistoryEventTimelineConversion {
MockVersionInfo mockVersionInfo = new MockVersionInfo();
AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
submitTime, user, conf, mockVersionInfo);
- HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ HistoryEventTimelineConversion.convertToTimelineEntities(event);
} finally {
shutdown.set(true);
confChanger.join();
@@ -279,7 +279,9 @@ public class TestHistoryEventTimelineConversion {
AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
submitTime, user, conf, mockVersionInfo);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(launchTime, timelineEntity.getStartTime().longValue());
@@ -322,7 +324,9 @@ public class TestHistoryEventTimelineConversion {
long submitTime = random.nextLong();
AMLaunchedEvent event = new AMLaunchedEvent(applicationAttemptId, launchTime, submitTime, user);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType());
@@ -357,7 +361,9 @@ public class TestHistoryEventTimelineConversion {
AMStartedEvent event = new AMStartedEvent(applicationAttemptId, startTime, user);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals("tez_" + applicationAttemptId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), timelineEntity.getEntityType());
@@ -383,7 +389,9 @@ public class TestHistoryEventTimelineConversion {
long launchTime = random.nextLong();
ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
applicationAttemptId);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
@@ -414,7 +422,9 @@ public class TestHistoryEventTimelineConversion {
int exitStatus = random.nextInt();
ContainerStoppedEvent event = new ContainerStoppedEvent(containerId, stopTime, exitStatus,
applicationAttemptId);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals("tez_" + containerId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_CONTAINER_ID.name(), timelineEntity.getEntityType());
@@ -446,7 +456,10 @@ public class TestHistoryEventTimelineConversion {
long startTime = random.nextLong();
String dagName = "testDagName";
DAGStartedEvent event = new DAGStartedEvent(tezDAGID, startTime, user, dagName);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
@@ -477,7 +490,21 @@ public class TestHistoryEventTimelineConversion {
DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
applicationAttemptId, null, user, null, containerLogs, queueName);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(2, entities.size());
+
+
+ if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) {
+ assertDagSubmittedEntity(submitTime, event, entities.get(0));
+ assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(1));
+ } else {
+ assertDagSubmittedExtraInfoEntity(submitTime, event, entities.get(0));
+ assertDagSubmittedEntity(submitTime, event, entities.get(1));
+ }
+ }
+
+ private void assertDagSubmittedEntity(long submitTime, DAGSubmittedEvent event,
+ TimelineEntity timelineEntity) {
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
@@ -511,10 +538,9 @@ public class TestHistoryEventTimelineConversion {
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_QUEUE_NAME)
- .contains(queueName));
+ .contains(event.getQueueName()));
Assert.assertEquals(9, timelineEntity.getOtherInfo().size());
- Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
Assert.assertEquals(applicationId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
Assert.assertEquals(applicationAttemptId.toString(),
@@ -534,9 +560,31 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(
timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
dagPlan.getCallerContext().getCallerType());
+ Assert.assertEquals(dagPlan.getCallerContext().getContext(),
+ timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT));
Assert.assertEquals(
- queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME));
+ event.getQueueName(), timelineEntity.getOtherInfo().get(ATSConstants.DAG_QUEUE_NAME));
+
+ }
+
+ private void assertDagSubmittedExtraInfoEntity(long submitTime, DAGSubmittedEvent event,
+ TimelineEntity timelineEntity) {
+ Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(timelineEntity.getRelatedEntities()
+ .get(EntityTypes.TEZ_DAG_ID.name()).contains(tezDAGID.toString()));
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.DAG_SUBMITTED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(submitTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue());
+ Assert.assertEquals(0, timelineEntity.getPrimaryFilters().size());
+ Assert.assertEquals(1, timelineEntity.getOtherInfo().size());
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
}
@SuppressWarnings("unchecked")
@@ -561,7 +609,10 @@ public class TestHistoryEventTimelineConversion {
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
startTime, finishTime, state, TaskFailureType.FATAL, error, diagnostics, counters, events, null, creationTime,
tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -613,8 +664,11 @@ public class TestHistoryEventTimelineConversion {
creationTime,
tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL",
"nodeHttpAddress");
- TimelineEntity timelineEntityWithNullFailureType =
- HistoryEventTimelineConversion.convertToTimelineEntity(eventWithNullFailureType);
+ List<TimelineEntity> evtEntities = HistoryEventTimelineConversion.convertToTimelineEntities(
+ eventWithNullFailureType);
+ Assert.assertEquals(1, evtEntities.size());
+ TimelineEntity timelineEntityWithNullFailureType = evtEntities.get(0);
+
Assert.assertNull(
timelineEntityWithNullFailureType.getOtherInfo().get(ATSConstants.TASK_FAILURE_TYPE));
}
@@ -630,7 +684,11 @@ public class TestHistoryEventTimelineConversion {
DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
nameIdMap);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
@@ -671,7 +729,20 @@ public class TestHistoryEventTimelineConversion {
DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
"diagnostics", null, user, dagPlan.getName(), taskStats, applicationAttemptId, dagPlan);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(2, entities.size());
+
+ if (entities.get(0).getEntityType().equals(EntityTypes.TEZ_DAG_ID.name())) {
+ assertDagFinishedEntity(finishTime, startTime, event, entities.get(0));
+ assertDagFinishedExtraInfoEntity(finishTime, entities.get(1));
+ } else {
+ assertDagFinishedExtraInfoEntity(finishTime, entities.get(0));
+ assertDagFinishedEntity(finishTime, startTime, event, entities.get(1));
+ }
+ }
+
+ private void assertDagFinishedEntity(long finishTime, long startTime, DAGFinishedEvent event,
+ TimelineEntity timelineEntity) {
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
@@ -703,7 +774,6 @@ public class TestHistoryEventTimelineConversion {
((Long) timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
Assert.assertEquals(finishTime - startTime,
((Long) timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
- Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
Assert.assertEquals(DAGState.ERROR.name(),
timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
Assert.assertEquals("diagnostics",
@@ -717,6 +787,23 @@ public class TestHistoryEventTimelineConversion {
((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
}
+ private void assertDagFinishedExtraInfoEntity(long finishTime, TimelineEntity timelineEntity) {
+ Assert.assertEquals(EntityTypes.TEZ_DAG_EXTRA_INFO.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(1, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.TEZ_DAG_ID).contains(
+ tezDAGID.toString()));
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
+ }
+
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testConvertVertexInitializedEvent() {
@@ -731,7 +818,11 @@ public class TestHistoryEventTimelineConversion {
.setTaskSchedulerClassName("def1")
.setTaskCommunicatorClassName("ghi1"));
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
@@ -794,7 +885,10 @@ public class TestHistoryEventTimelineConversion {
VertexStartedEvent event = new VertexStartedEvent(tezVertexID, startRequestedTime, startTime);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
@@ -844,7 +938,10 @@ public class TestHistoryEventTimelineConversion {
.setTaskSchedulerClassName("def1")
.setTaskCommunicatorClassName("ghi1"));
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
@@ -909,7 +1006,10 @@ public class TestHistoryEventTimelineConversion {
long startTime = random.nextLong();
TaskStartedEvent event = new TaskStartedEvent(tezTaskID, "v1", scheduleTime, startTime);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
@@ -953,7 +1053,10 @@ public class TestHistoryEventTimelineConversion {
TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
@@ -1009,7 +1112,9 @@ public class TestHistoryEventTimelineConversion {
TaskFinishedEvent event = new TaskFinishedEvent(tezTaskID, vertexName, startTime, finishTime,
tezTaskAttemptID, state, diagnostics, counters, 3);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
Assert.assertEquals(tezTaskID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ID.name(), timelineEntity.getEntityType());
@@ -1054,7 +1159,10 @@ public class TestHistoryEventTimelineConversion {
VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null,
edgeMgrs, null, true);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
Assert.assertEquals(vId.toString(), timelineEntity.getEntityId());
Assert.assertEquals(1, timelineEntity.getEvents().size());
@@ -1092,7 +1200,10 @@ public class TestHistoryEventTimelineConversion {
DAGRecoveredEvent event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID,
dagPlan.getName(), user, recoverTime, containerLogs);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
@@ -1128,7 +1239,10 @@ public class TestHistoryEventTimelineConversion {
dagPlan.getName(), user, recoverTime, DAGState.ERROR, "mock reason", containerLogs);
- TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ List<TimelineEntity> entities = HistoryEventTimelineConversion.convertToTimelineEntities(event);
+ Assert.assertEquals(1, entities.size());
+ TimelineEntity timelineEntity = entities.get(0);
+
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());