You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/29 20:41:47 UTC
[02/50] [abbrv] hadoop git commit: YARN-5792. Adopt the id prefix for
YARN, MR, and DS entities. Contributed by Varun Saxena.
YARN-5792. Adopt the id prefix for YARN, MR, and DS entities. Contributed by Varun Saxena.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3d6c749
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3d6c749
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3d6c749
Branch: refs/heads/YARN-5355
Commit: c3d6c7494f399256762baa90553d52e749ea8568
Parents: 25d5b02
Author: Sangjin Lee <sj...@apache.org>
Authored: Mon Nov 21 13:48:35 2016 -0800
Committer: Varun Saxena <va...@apache.org>
Committed: Wed Aug 30 01:18:42 2017 +0530
----------------------------------------------------------------------
.../jobhistory/JobHistoryEventHandler.java | 29 +++-
.../v2/app/job/impl/TaskAttemptImpl.java | 58 +++----
.../mapreduce/v2/app/job/impl/TaskImpl.java | 19 +-
.../hadoop/mapreduce/jobhistory/TestEvents.java | 4 +-
.../jobhistory/TestJobHistoryEventHandler.java | 8 +-
.../jobhistory/MapAttemptFinishedEvent.java | 87 ++++++----
.../jobhistory/ReduceAttemptFinishedEvent.java | 83 ++++++---
.../jobhistory/TaskAttemptFinishedEvent.java | 47 +++--
.../TaskAttemptUnsuccessfulCompletionEvent.java | 54 ++++--
.../mapreduce/jobhistory/TaskFailedEvent.java | 51 ++++--
.../mapreduce/jobhistory/TaskFinishedEvent.java | 42 +++--
.../mapred/TestMRTimelineEventHandling.java | 30 +++-
.../distributedshell/ApplicationMaster.java | 42 ++++-
.../distributedshell/TestDistributedShell.java | 173 +++++++++++--------
.../containermanager/ContainerManagerImpl.java | 6 +-
.../ApplicationContainerFinishedEvent.java | 9 +-
.../containermanager/container/Container.java | 2 +
.../container/ContainerImpl.java | 22 ++-
.../recovery/NMLeveldbStateStoreService.java | 21 ++-
.../recovery/NMNullStateStoreService.java | 2 +-
.../recovery/NMStateStoreService.java | 13 +-
.../timelineservice/NMTimelinePublisher.java | 18 +-
.../application/TestApplication.java | 2 +-
.../recovery/NMMemoryStateStoreService.java | 4 +-
.../TestNMLeveldbStateStoreService.java | 6 +-
.../nodemanager/webapp/MockContainer.java | 4 +
.../nodemanager/webapp/TestNMWebServer.java | 4 +-
.../resourcemanager/ResourceTrackerService.java | 14 +-
.../metrics/TimelineServiceV2Publisher.java | 12 +-
.../TestSystemMetricsPublisherForV2.java | 13 +-
30 files changed, 590 insertions(+), 289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 53fe055..a1a31f9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -78,6 +78,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.JsonNodeFactory;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
@@ -1124,7 +1127,7 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
String entityType, String relatedJobEntity, JobId jobId,
- boolean setCreatedTime) {
+ boolean setCreatedTime, long taskIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskId);
@@ -1133,6 +1136,7 @@ public class JobHistoryEventHandler extends AbstractService
((TaskStartedEvent)event).getTaskType().toString());
}
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
+ entity.setIdPrefix(taskIdPrefix);
return entity;
}
@@ -1141,11 +1145,12 @@ public class JobHistoryEventHandler extends AbstractService
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskAttemptEntity(HistoryEvent event, long timestamp,
String taskAttemptId, String entityType, String relatedTaskEntity,
- String taskId, boolean setCreatedTime) {
+ String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskAttemptId);
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
+ entity.setIdPrefix(taskAttemptIdPrefix);
return entity;
}
@@ -1196,6 +1201,8 @@ public class JobHistoryEventHandler extends AbstractService
String taskId = null;
String taskAttemptId = null;
boolean setCreatedTime = false;
+ long taskIdPrefix = 0;
+ long taskAttemptIdPrefix = 0;
switch (event.getEventType()) {
// Handle job events
@@ -1218,15 +1225,21 @@ public class JobHistoryEventHandler extends AbstractService
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
+ taskIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskStartedEvent)event).getStartTime());
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
+ taskIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskFailedEvent)event).getStartTime());
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
+ taskIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskFinishedEvent)event).getStartTime());
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
@@ -1234,6 +1247,8 @@ public class JobHistoryEventHandler extends AbstractService
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.
+ invertLong(((TaskAttemptStartedEvent)event).getStartTime());
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
@@ -1253,16 +1268,22 @@ public class JobHistoryEventHandler extends AbstractService
getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.invertLong(
+ ((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime());
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).
getAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.
+ invertLong(((MapAttemptFinishedEvent)event).getStartTime());
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).
getAttemptId().toString();
+ taskAttemptIdPrefix = TimelineServiceHelper.
+ invertLong(((ReduceAttemptFinishedEvent)event).getStartTime());
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
@@ -1291,12 +1312,12 @@ public class JobHistoryEventHandler extends AbstractService
// TaskEntity
tEntity = createTaskEntity(event, timestamp, taskId,
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
- jobId, setCreatedTime);
+ jobId, setCreatedTime, taskIdPrefix);
} else {
// TaskAttemptEntity
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
- taskId, setCreatedTime);
+ taskId, setCreatedTime, taskAttemptIdPrefix);
}
}
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 9ea1b9a..3faad48 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1530,7 +1530,7 @@ public abstract class TaskAttemptImpl implements
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getCounters(), taskAttempt
- .getProgressSplitBlock().burst());
+ .getProgressSplitBlock().burst(), taskAttempt.launchTime);
return tauce;
}
@@ -1943,35 +1943,35 @@ public abstract class TaskAttemptImpl implements
this.container == null ? -1 : this.container.getNodeId().getPort();
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe =
- new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
- TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
- state.toString(),
- this.reportedStatus.mapFinishTime,
- finishTime,
- containerHostName,
- containerNodePort,
- this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
- this.reportedStatus.stateString,
- getCounters(),
- getProgressSplitBlock().burst());
- eventHandler.handle(
- new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
+ new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.mapFinishTime,
+ finishTime,
+ containerHostName,
+ containerNodePort,
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst(), launchTime);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
} else {
- ReduceAttemptFinishedEvent rfe =
- new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
- TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
- state.toString(),
- this.reportedStatus.shuffleFinishTime,
- this.reportedStatus.sortFinishTime,
- finishTime,
- containerHostName,
- containerNodePort,
- this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
- this.reportedStatus.stateString,
- getCounters(),
- getProgressSplitBlock().burst());
- eventHandler.handle(
- new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
+ ReduceAttemptFinishedEvent rfe =
+ new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
+ TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
+ state.toString(),
+ this.reportedStatus.shuffleFinishTime,
+ this.reportedStatus.sortFinishTime,
+ finishTime,
+ containerHostName,
+ containerNodePort,
+ this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
+ this.reportedStatus.stateString,
+ getCounters(),
+ getProgressSplitBlock().burst(), launchTime);
+ eventHandler.handle(
+ new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
index 8a6fa30..228ae24 100755
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
@@ -139,6 +139,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private final Set<TaskAttemptId> inProgressAttempts;
private boolean historyTaskStartGenerated = false;
+ // Launch time reported in history events.
+ private long launchTime;
private static final SingleArcTransition<TaskImpl, TaskEvent>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
@@ -705,8 +707,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private void sendTaskStartedEvent() {
+ launchTime = getLaunchTime();
TaskStartedEvent tse = new TaskStartedEvent(
- TypeConverter.fromYarn(taskId), getLaunchTime(),
+ TypeConverter.fromYarn(taskId), launchTime,
TypeConverter.fromYarn(taskId.getTaskType()),
getSplitsAsString());
eventHandler
@@ -714,18 +717,19 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
historyTaskStartGenerated = true;
}
- private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
+ private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task,
+ TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
TypeConverter.fromYarn(task.successfulAttempt),
task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()),
- taskState.toString(),
- task.getCounters());
+ taskState.toString(), task.getCounters(), task.launchTime);
return tfe;
}
- private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
+ private static TaskFailedEvent createTaskFailedEvent(TaskImpl task,
+ List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
StringBuilder errorSb = new StringBuilder();
if (diag != null) {
for (String d : diag) {
@@ -740,7 +744,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
errorSb.toString(),
taskState.toString(),
taId == null ? null : TypeConverter.fromYarn(taId),
- task.getCounters());
+ task.getCounters(), task.launchTime);
return taskFailedEvent;
}
@@ -861,7 +865,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
taskInfo.getFinishTime(), taskInfo.getTaskType(),
taskInfo.getError(), taskInfo.getTaskStatus(),
- taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
+ taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(),
+ launchTime);
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
eventHandler.handle(
new JobTaskEvent(taskId, getExternalState(taskState)));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
index ac510b3..e271319 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
@@ -58,7 +58,7 @@ public class TestEvents {
Counters counters = new Counters();
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
- counters);
+ counters, 234);
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
assertEquals(test.getCounters(), counters);
@@ -69,7 +69,7 @@ public class TestEvents {
assertEquals(test.getTaskId(), tid);
assertEquals(test.getTaskStatus(), "TEST");
assertEquals(test.getTaskType(), TaskType.REDUCE);
-
+ assertEquals(234, test.getStartTime());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index caf8c67..e35a84d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -148,7 +148,7 @@ public class TestJobHistoryEventHandler {
// First completion event, but min-queue-size for batching flushes is 10
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
verify(mockWriter).flush();
} finally {
@@ -184,7 +184,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@@ -229,7 +229,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
handleNextNEvents(jheh, 9);
@@ -272,7 +272,7 @@ public class TestJobHistoryEventHandler {
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
+ t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
index 3121c4e..2b1357e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
@@ -32,9 +32,10 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
- * Event to record successful completion of a map attempt
+ * Event to record successful completion of a map attempt.
*
*/
@InterfaceAudience.Private
@@ -58,9 +59,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
+ private long startTime;
/**
- * Create an event for successful completion of map attempts
+ * Create an event for successful completion of map attempts.
* @param id Task Attempt ID
* @param taskType Type of the task
* @param taskStatus Status of the task
@@ -77,12 +79,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* virtual memory and physical memory.
*
* If you have no splits data, code {@code null} for this
- * parameter.
+ * parameter.
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
- public MapAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long mapFinishTime, long finishTime, String hostname, int port,
- String rackName, String state, Counters counters, int[][] allSplits) {
+ public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long mapFinishTime, long finishTime, String hostname,
+ int port, String rackName, String state, Counters counters,
+ int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@@ -98,6 +101,16 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+ this.startTime = startTs;
+ }
+
+ public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long mapFinishTime, long finishTime, String hostname,
+ int port, String rackName, String state, Counters counters,
+ int[][] allSplits) {
+ this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, port,
+ rackName, state, counters, allSplits,
+ SystemClock.getInstance().getTime());
}
/**
@@ -117,15 +130,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
* @param counters Counters for the attempt
*/
@Deprecated
- public MapAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long mapFinishTime, long finishTime, String hostname,
- String state, Counters counters) {
+ public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long mapFinishTime, long finishTime, String hostname,
+ String state, Counters counters) {
this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "",
state, counters, null);
}
-
-
+
MapAttemptFinishedEvent() {}
public Object getDatum() {
@@ -175,38 +186,56 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
- /** Get the task ID */
- public TaskID getTaskId() { return attemptId.getTaskID(); }
- /** Get the attempt id */
+ /** Gets the task ID. */
+ public TaskID getTaskId() {
+ return attemptId.getTaskID();
+ }
+ /** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the task status */
+ /** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
- /** Get the map phase finish time */
+ /** Gets the map phase finish time. */
public long getMapFinishTime() { return mapFinishTime; }
- /** Get the attempt finish time */
+ /** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
- /** Get the host name */
+ /**
+ * Gets the task attempt start time.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the host name. */
public String getHostname() { return hostname.toString(); }
- /** Get the tracker rpc port */
+ /** Gets the tracker rpc port. */
public int getPort() { return port; }
- /** Get the rack name */
+ /** Gets the rack name. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
-
- /** Get the state string */
- public String getState() { return state.toString(); }
- /** Get the counters */
- Counters getCounters() { return counters; }
- /** Get the event type */
+ /**
+ * Gets the attempt state string.
+ * @return map attempt state
+ */
+ public String getState() {
+ return state.toString();
+ }
+ /**
+ * Gets the counters.
+ * @return counters
+ */
+ Counters getCounters() {
+ return counters;
+ }
+ /** Gets the event type. */
public EventType getEventType() {
return EventType.MAP_ATTEMPT_FINISHED;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
index 9c0f09b..5a16f83 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful completion of a reduce attempt
@@ -59,6 +60,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
+ private long startTime;
/**
* Create an event to record completion of a reduce attempt
@@ -76,13 +78,13 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
- * virtual memory and physical memory.
+ * virtual memory and physical memory.
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
- public ReduceAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long shuffleFinishTime, long sortFinishTime, long finishTime,
- String hostname, int port, String rackName, String state,
- Counters counters, int[][] allSplits) {
+ public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long shuffleFinishTime, long sortFinishTime,
+ long finishTime, String hostname, int port, String rackName,
+ String state, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@@ -99,6 +101,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+ this.startTime = startTs;
+ }
+
+ public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long shuffleFinishTime, long sortFinishTime,
+ long finishTime, String hostname, int port, String rackName,
+ String state, Counters counters, int[][] allSplits) {
+ this(id, taskType, taskStatus, shuffleFinishTime, sortFinishTime,
+ finishTime, hostname, port, rackName, state, counters, allSplits,
+ SystemClock.getInstance().getTime());
}
/**
@@ -118,13 +130,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
* @param state State of the attempt
* @param counters Counters for the attempt
*/
- public ReduceAttemptFinishedEvent
- (TaskAttemptID id, TaskType taskType, String taskStatus,
- long shuffleFinishTime, long sortFinishTime, long finishTime,
- String hostname, String state, Counters counters) {
+ public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long shuffleFinishTime, long sortFinishTime,
+ long finishTime, String hostname, String state, Counters counters) {
this(id, taskType, taskStatus,
- shuffleFinishTime, sortFinishTime, finishTime,
- hostname, -1, "", state, counters, null);
+ shuffleFinishTime, sortFinishTime, finishTime,
+ hostname, -1, "", state, counters, null);
}
ReduceAttemptFinishedEvent() {}
@@ -178,39 +189,55 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
- /** Get the Task ID */
+ /** Gets the Task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
- /** Get the attempt id */
+ /** Gets the attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the task status */
+ /** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
- /** Get the finish time of the sort phase */
+ /** Gets the finish time of the sort phase. */
public long getSortFinishTime() { return sortFinishTime; }
- /** Get the finish time of the shuffle phase */
+ /** Gets the finish time of the shuffle phase. */
public long getShuffleFinishTime() { return shuffleFinishTime; }
- /** Get the finish time of the attempt */
+ /** Gets the finish time of the attempt. */
public long getFinishTime() { return finishTime; }
- /** Get the name of the host where the attempt ran */
+ /**
+ * Gets the start time.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the name of the host where the attempt ran. */
public String getHostname() { return hostname.toString(); }
- /** Get the tracker rpc port */
+ /** Gets the tracker rpc port. */
public int getPort() { return port; }
- /** Get the rack name of the node where the attempt ran */
+ /** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
-
- /** Get the state string */
- public String getState() { return state.toString(); }
- /** Get the counters for the attempt */
- Counters getCounters() { return counters; }
- /** Get the event type */
+ /**
+ * Gets the state string.
+ * @return reduce attempt state
+ */
+ public String getState() {
+ return state.toString();
+ }
+ /**
+ * Gets the counters.
+ * @return counters
+ */
+ Counters getCounters() {
+ return counters;
+ }
+ /** Gets the event type. */
public EventType getEventType() {
return EventType.REDUCE_ATTEMPT_FINISHED;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index a931ca2..c28c216 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record successful task completion
@@ -50,10 +51,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private String hostname;
private String state;
private Counters counters;
+ private long startTime;
/**
- * Create an event to record successful finishes for setup and cleanup
- * attempts
+ * Create an event to record successful finishes for setup and cleanup
+ * attempts.
* @param id Attempt ID
* @param taskType Type of task
* @param taskStatus Status of task
@@ -61,11 +63,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
* @param hostname Host where the attempt executed
* @param state State string
* @param counters Counters for the attempt
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus,
long finishTime, String rackName,
- String hostname, String state, Counters counters) {
+ String hostname, String state, Counters counters, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.taskStatus = taskStatus;
@@ -74,6 +77,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.hostname = hostname;
this.state = state;
this.counters = counters;
+ this.startTime = startTs;
+ }
+
+ public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType,
+ String taskStatus, long finishTime, String rackName, String hostname,
+ String state, Counters counters) {
+ this(id, taskType, taskStatus, finishTime, rackName, hostname, state,
+ counters, SystemClock.getInstance().getTime());
}
TaskAttemptFinishedEvent() {}
@@ -107,33 +118,43 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
- /** Get the task ID */
+ /** Gets the task ID. */
public TaskID getTaskId() { return attemptId.getTaskID(); }
- /** Get the task attempt id */
+ /** Gets the task attempt id. */
public TaskAttemptID getAttemptId() {
return attemptId;
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the task status */
+ /** Gets the task status. */
public String getTaskStatus() { return taskStatus.toString(); }
- /** Get the attempt finish time */
+ /** Gets the attempt finish time. */
public long getFinishTime() { return finishTime; }
- /** Get the host where the attempt executed */
+ /**
+ * Gets the task attempt start time to be used while publishing to ATSv2.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the host where the attempt executed. */
public String getHostname() { return hostname.toString(); }
- /** Get the rackname where the attempt executed */
+ /** Gets the rackname where the attempt executed. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
- /** Get the state string */
+ /**
+ * Gets the state string.
+ * @return task attempt state.
+ */
public String getState() { return state.toString(); }
- /** Get the counters for the attempt */
+ /** Gets the counters for the attempt. */
Counters getCounters() { return counters; }
- /** Get the event type */
+ /** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 1752967..1529125 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record unsuccessful (Killed/Failed) completion of task attempts
@@ -58,10 +59,11 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
+ private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
- /**
- * Create an event to record the unsuccessful completion of attempts
+ /**
+ * Create an event to record the unsuccessful completion of attempts.
* @param id Attempt ID
* @param taskType Type of the task
* @param status Status of the attempt
@@ -75,12 +77,13 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
* virtual memory and physical memory.
+ * @param startTs Task start time to be used for writing entity to ATSv2.
*/
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
- String error, Counters counters, int[][] allSplits) {
+ String error, Counters counters, int[][] allSplits, long startTs) {
this.attemptId = id;
this.taskType = taskType;
this.status = status;
@@ -99,6 +102,15 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes =
ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
+ this.startTime = startTs;
+ }
+
+ public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id,
+ TaskType taskType, String status, long finishTime, String hostname,
+ int port, String rackName, String error, Counters counters,
+ int[][] allSplits) {
+ this(id, taskType, status, finishTime, hostname, port, rackName, error,
+ counters, allSplits, SystemClock.getInstance().getTime());
}
/**
@@ -190,39 +202,49 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
AvroArrayUtils.fromAvro(datum.getPhysMemKbytes());
}
- /** Get the task id */
+ /** Gets the task id. */
public TaskID getTaskId() {
return attemptId.getTaskID();
}
- /** Get the task type */
+ /** Gets the task type. */
public TaskType getTaskType() {
return TaskType.valueOf(taskType.toString());
}
- /** Get the attempt id */
+ /** Gets the attempt id. */
public TaskAttemptID getTaskAttemptId() {
return attemptId;
}
- /** Get the finish time */
+ /** Gets the finish time. */
public long getFinishTime() { return finishTime; }
- /** Get the name of the host where the attempt executed */
+ /**
+ * Gets the task attempt start time to be used while publishing to ATSv2.
+ * @return task attempt start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the name of the host where the attempt executed. */
public String getHostname() { return hostname; }
- /** Get the rpc port for the host where the attempt executed */
+ /** Gets the rpc port for the host where the attempt executed. */
public int getPort() { return port; }
-
- /** Get the rack name of the node where the attempt ran */
+
+ /** Gets the rack name of the node where the attempt ran. */
public String getRackName() {
return rackName == null ? null : rackName.toString();
}
-
- /** Get the error string */
+
+ /** Gets the error string. */
public String getError() { return error.toString(); }
- /** Get the task status */
+ /**
+ * Gets the task attempt status.
+ * @return task attempt status.
+ */
public String getTaskStatus() {
return status.toString();
}
- /** Get the counters */
+ /** Gets the counters. */
Counters getCounters() { return counters; }
- /** Get the event type */
+ /** Gets the event type. */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
index d14350d..b4d9e41 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the failure of a task
@@ -49,11 +50,12 @@ public class TaskFailedEvent implements HistoryEvent {
private String status;
private String error;
private Counters counters;
+ private long startTime;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
- * Create an event to record task failure
+ * Create an event to record task failure.
* @param id Task ID
* @param finishTime Finish time of the task
* @param taskType Type of the task
@@ -61,10 +63,11 @@ public class TaskFailedEvent implements HistoryEvent {
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
+ * @param startTs task start time.
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
- TaskAttemptID failedDueToAttempt, Counters counters) {
+ TaskAttemptID failedDueToAttempt, Counters counters, long startTs) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
@@ -72,15 +75,23 @@ public class TaskFailedEvent implements HistoryEvent {
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
+ this.startTime = startTs;
+ }
+
+ public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType,
+ String error, String status, TaskAttemptID failedDueToAttempt,
+ Counters counters) {
+ this(id, finishTime, taskType, error, status, failedDueToAttempt, counters,
+ SystemClock.getInstance().getTime());
}
public TaskFailedEvent(TaskID id, long finishTime,
- TaskType taskType, String error, String status,
- TaskAttemptID failedDueToAttempt) {
- this(id, finishTime, taskType, error, status,
- failedDueToAttempt, EMPTY_COUNTERS);
+ TaskType taskType, String error, String status,
+ TaskAttemptID failedDueToAttempt) {
+ this(id, finishTime, taskType, error, status, failedDueToAttempt,
+ EMPTY_COUNTERS);
}
-
+
TaskFailedEvent() {}
public Object getDatum() {
@@ -118,27 +129,37 @@ public class TaskFailedEvent implements HistoryEvent {
EventReader.fromAvro(datum.getCounters());
}
- /** Get the task id */
+ /** Gets the task id. */
public TaskID getTaskId() { return id; }
- /** Get the error string */
+ /** Gets the error string. */
public String getError() { return error; }
- /** Get the finish time of the attempt */
+ /** Gets the finish time of the attempt. */
public long getFinishTime() {
return finishTime;
}
- /** Get the task type */
+ /**
+ * Gets the task start time to be reported to ATSv2.
+ * @return task start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets the task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get the attempt id due to which the task failed */
+ /** Gets the attempt id due to which the task failed. */
public TaskAttemptID getFailedAttemptID() {
return failedDueToAttempt;
}
- /** Get the task status */
+ /**
+ * Gets the task status.
+ * @return task status
+ */
public String getTaskStatus() { return status; }
- /** Get task counters */
+ /** Gets task counters. */
public Counters getCounters() { return counters; }
- /** Get the event type */
+ /** Gets the event type. */
public EventType getEventType() {
return EventType.TASK_FAILED;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
index 0bc4383..97557c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.util.SystemClock;
/**
* Event to record the successful completion of a task
@@ -49,27 +50,36 @@ public class TaskFinishedEvent implements HistoryEvent {
private TaskType taskType;
private String status;
private Counters counters;
-
+ private long startTime;
+
/**
- * Create an event to record the successful completion of a task
+ * Create an event to record the successful completion of a task.
* @param id Task ID
* @param attemptId Task Attempt ID of the successful attempt for this task
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param status Status string
* @param counters Counters for the task
+ * @param startTs task start time
*/
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType,
- String status, Counters counters) {
+ String status, Counters counters, long startTs) {
this.taskid = id;
this.successfulAttemptId = attemptId;
this.finishTime = finishTime;
this.taskType = taskType;
this.status = status;
this.counters = counters;
+ this.startTime = startTs;
}
-
+
+ public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
+ TaskType taskType, String status, Counters counters) {
+ this(id, attemptId, finishTime, taskType, status, counters,
+ SystemClock.getInstance().getTime());
+ }
+
TaskFinishedEvent() {}
public Object getDatum() {
@@ -101,23 +111,33 @@ public class TaskFinishedEvent implements HistoryEvent {
this.counters = EventReader.fromAvro(datum.getCounters());
}
- /** Get task id */
+ /** Gets task id. */
public TaskID getTaskId() { return taskid; }
- /** Get successful task attempt id */
+ /** Gets successful task attempt id. */
public TaskAttemptID getSuccessfulTaskAttemptId() {
return successfulAttemptId;
}
- /** Get the task finish time */
+ /** Gets the task finish time. */
public long getFinishTime() { return finishTime; }
- /** Get task counters */
+ /**
+ * Gets the task start time to be reported to ATSv2.
+ * @return task start time
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /** Gets task counters. */
public Counters getCounters() { return counters; }
- /** Get task type */
+ /** Gets task type. */
public TaskType getTaskType() {
return taskType;
}
- /** Get task status */
+ /**
+ * Gets task status.
+ * @return task status
+ */
public String getTaskStatus() { return status.toString(); }
- /** Get event type */
+ /** Gets event type. */
public EventType getEventType() {
return EventType.TASK_FINISHED;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index cbca3c8..9434d46 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -298,10 +298,10 @@ public class TestMRTimelineEventHandling {
" does not exist.",
jobEventFile.exists());
verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(),
- true, false, null);
+ true, false, null, false);
Set<String> cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2",
"huge_dummy_conf1", "huge_dummy_conf2");
- verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
+ verifyEntity(jobEventFile, null, false, true, cfgsToCheck, false);
// for this test, we expect MR job metrics are published in YARN_APPLICATION
String outputAppDir =
@@ -322,8 +322,8 @@ public class TestMRTimelineEventHandling {
"appEventFilePath: " + appEventFilePath +
" does not exist.",
appEventFile.exists());
- verifyEntity(appEventFile, null, true, false, null);
- verifyEntity(appEventFile, null, false, true, cfgsToCheck);
+ verifyEntity(appEventFile, null, true, false, null, false);
+ verifyEntity(appEventFile, null, false, true, cfgsToCheck, false);
// check for task event file
String outputDirTask =
@@ -344,7 +344,7 @@ public class TestMRTimelineEventHandling {
" does not exist.",
taskEventFile.exists());
verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(),
- true, false, null);
+ true, false, null, true);
// check for task attempt event file
String outputDirTaskAttempt =
@@ -363,7 +363,7 @@ public class TestMRTimelineEventHandling {
Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
" does not exist.", taskAttemptEventFile.exists());
verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(),
- true, false, null);
+ true, false, null, true);
}
/**
@@ -380,12 +380,13 @@ public class TestMRTimelineEventHandling {
* @throws IOException
*/
private void verifyEntity(File entityFile, String eventId,
- boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify)
- throws IOException {
+ boolean chkMetrics, boolean chkCfg, Set<String> cfgsToVerify,
+ boolean checkIdPrefix) throws IOException {
BufferedReader reader = null;
String strLine;
try {
reader = new BufferedReader(new FileReader(entityFile));
+ long idPrefix = -1;
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@@ -394,6 +395,19 @@ public class TestMRTimelineEventHandling {
strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity.class);
+
+ LOG.info("strLine.trim()= " + strLine.trim());
+ if (checkIdPrefix) {
+ Assert.assertTrue("Entity ID prefix expected to be > 0" ,
+ entity.getIdPrefix() > 0);
+ if (idPrefix == -1) {
+ idPrefix = entity.getIdPrefix();
+ } else {
+ Assert.assertEquals("Entity ID prefix should be same across " +
+ "each publish of same entity",
+ idPrefix, entity.getIdPrefix());
+ }
+ }
if (eventId == null) {
// Job metrics are published without any events for
// ApplicationEntity. There is also possibility that some other
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index ab4607a..a02af70 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -104,6 +104,8 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;
@@ -314,6 +316,17 @@ public class ApplicationMaster {
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
/**
+ * Container start times used to set id prefix while publishing entity
+ * to ATSv2.
+ */
+ private final ConcurrentMap<ContainerId, Long> containerStartTimes =
+ new ConcurrentHashMap<ContainerId, Long>();
+
+ private ConcurrentMap<ContainerId, Long> getContainerStartTimes() {
+ return containerStartTimes;
+ }
+
+ /**
* @param args Command line args
*/
public static void main(String[] args) {
@@ -866,7 +879,15 @@ public class ApplicationMaster {
+ containerStatus.getContainerId());
}
if (timelineServiceV2Enabled) {
- publishContainerEndEventOnTimelineServiceV2(containerStatus);
+ Long containerStartTime =
+ containerStartTimes.get(containerStatus.getContainerId());
+ if (containerStartTime == null) {
+ containerStartTime = SystemClock.getInstance().getTime();
+ containerStartTimes.put(containerStatus.getContainerId(),
+ containerStartTime);
+ }
+ publishContainerEndEventOnTimelineServiceV2(containerStatus,
+ containerStartTime);
} else if (timelineServiceV1Enabled) {
publishContainerEndEvent(timelineClient, containerStatus, domainId,
appSubmitterUgi);
@@ -994,8 +1015,10 @@ public class ApplicationMaster {
containerId, container.getNodeId());
}
if (applicationMaster.timelineServiceV2Enabled) {
- applicationMaster
- .publishContainerStartEventOnTimelineServiceV2(container);
+ long startTime = SystemClock.getInstance().getTime();
+ applicationMaster.getContainerStartTimes().put(containerId, startTime);
+ applicationMaster.publishContainerStartEventOnTimelineServiceV2(
+ container, startTime);
} else if (applicationMaster.timelineServiceV1Enabled) {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
@@ -1356,24 +1379,24 @@ public class ApplicationMaster {
}
private void publishContainerStartEventOnTimelineServiceV2(
- Container container) {
+ Container container, long startTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity =
new org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
- long ts = System.currentTimeMillis();
- entity.setCreatedTime(ts);
+ entity.setCreatedTime(startTime);
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
- event.setTimestamp(ts);
+ event.setTimestamp(startTime);
event.setId(DSEvent.DS_CONTAINER_START.toString());
event.addInfo("Node", container.getNodeId().toString());
event.addInfo("Resources", container.getResource().toString());
entity.addEvent(event);
+ entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -1391,7 +1414,7 @@ public class ApplicationMaster {
}
private void publishContainerEndEventOnTimelineServiceV2(
- final ContainerStatus container) {
+ final ContainerStatus container, long containerStartTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity =
new org.apache.hadoop.yarn.api.records.timelineservice.
@@ -1407,6 +1430,7 @@ public class ApplicationMaster {
event.addInfo("State", container.getState().name());
event.addInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
+ entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -1441,6 +1465,8 @@ public class ApplicationMaster {
event.setId(appEvent.toString());
event.setTimestamp(ts);
entity.addEvent(event);
+ entity.setIdPrefix(
+ TimelineServiceHelper.invertLong(appAttemptID.getAttemptId()));
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index ef21c87..47485ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
@@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@@ -523,15 +526,31 @@ public class TestDistributedShell {
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
- appTimestampFileName);
-
- // Verify DS_CONTAINER entities posted by the client
+ File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
+ "DS_APP_ATTEMPT", appTimestampFileName);
+ // Check if required events are published and same idprefix is sent for
+ // on each publish.
+ verifyEntityForTimelineV2(dsAppAttemptEntityFile,
+ DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
+ // to avoid race condition of testcase, atleast check 40 times with sleep
+ // of 50ms
+ verifyEntityForTimelineV2(dsAppAttemptEntityFile,
+ DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
+
+ // Verify DS_CONTAINER entities posted by the client.
String containerTimestampFileName =
"container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000002.thist";
- verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
- containerTimestampFileName);
+ File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
+ "DS_CONTAINER", containerTimestampFileName);
+ // Check if required events are published and same idprefix is sent for
+ // on each publish.
+ verifyEntityForTimelineV2(dsContainerEntityFile,
+ DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
+ // to avoid race condition of testcase, atleast check 40 times with sleep
+ // of 50ms
+ verifyEntityForTimelineV2(dsContainerEntityFile,
+ DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
// Verify NM posting container metrics info.
String containerMetricsTimestampFileName =
@@ -541,29 +560,13 @@ public class TestDistributedShell {
File containerEntityFile = verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_CONTAINER.toString(),
containerMetricsTimestampFileName);
- Assert.assertEquals(
- "Container created event needs to be published atleast once",
- 1,
- getNumOfStringOccurrences(containerEntityFile,
- ContainerMetricsConstants.CREATED_EVENT_TYPE));
-
- // to avoid race condition of testcase, atleast check 4 times with sleep
- // of 500ms
- long numOfContainerFinishedOccurrences = 0;
- for (int i = 0; i < 4; i++) {
- numOfContainerFinishedOccurrences =
- getNumOfStringOccurrences(containerEntityFile,
- ContainerMetricsConstants.FINISHED_EVENT_TYPE);
- if (numOfContainerFinishedOccurrences > 0) {
- break;
- } else {
- Thread.sleep(500L);
- }
- }
- Assert.assertEquals(
- "Container finished event needs to be published atleast once",
- 1,
- numOfContainerFinishedOccurrences);
+ verifyEntityForTimelineV2(containerEntityFile,
+ ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
+
+ // to avoid race condition of testcase, atleast check 40 times with sleep
+ // of 50ms
+ verifyEntityForTimelineV2(containerEntityFile,
+ ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
// Verify RM posting Application life cycle Events are getting published
String appMetricsTimestampFileName =
@@ -573,29 +576,14 @@ public class TestDistributedShell {
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION.toString(),
appMetricsTimestampFileName);
- Assert.assertEquals(
- "Application created event should be published atleast once",
- 1,
- getNumOfStringOccurrences(appEntityFile,
- ApplicationMetricsConstants.CREATED_EVENT_TYPE));
-
- // to avoid race condition of testcase, atleast check 4 times with sleep
- // of 500ms
- long numOfStringOccurrences = 0;
- for (int i = 0; i < 4; i++) {
- numOfStringOccurrences =
- getNumOfStringOccurrences(appEntityFile,
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- if (numOfStringOccurrences > 0) {
- break;
- } else {
- Thread.sleep(500L);
- }
- }
- Assert.assertEquals(
- "Application finished event should be published atleast once",
- 1,
- numOfStringOccurrences);
+ // No need to check idprefix for app.
+ verifyEntityForTimelineV2(appEntityFile,
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
+
+ // to avoid race condition of testcase, atleast check 40 times with sleep
+ // of 50ms
+ verifyEntityForTimelineV2(appEntityFile,
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
// Verify RM posting AppAttempt life cycle Events are getting published
String appAttemptMetricsTimestampFileName =
@@ -606,17 +594,10 @@ public class TestDistributedShell {
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttemptMetricsTimestampFileName);
- Assert.assertEquals(
- "AppAttempt register event should be published atleast once",
- 1,
- getNumOfStringOccurrences(appAttemptEntityFile,
- AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
-
- Assert.assertEquals(
- "AppAttempt finished event should be published atleast once",
- 1,
- getNumOfStringOccurrences(appAttemptEntityFile,
- AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
+ verifyEntityForTimelineV2(appAttemptEntityFile,
+ AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
+ verifyEntityForTimelineV2(appAttemptEntityFile,
+ AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
} finally {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
}
@@ -636,22 +617,64 @@ public class TestDistributedShell {
return entityFile;
}
- private long getNumOfStringOccurrences(File entityFile, String searchString)
- throws IOException {
- BufferedReader reader = null;
- String strLine;
+ /**
+ * Checks the events and idprefix published for an entity.
+ *
+ * @param entityFile Entity file.
+ * @param expectedEvent Expected event Id.
+ * @param numOfExpectedEvent Number of expected occurences of expected event
+ * id.
+ * @param checkTimes Number of times to check.
+ * @param sleepTime Sleep time for each iteration.
+ * @param checkIdPrefix Whether to check idprefix.
+ * @throws IOException if entity file reading fails.
+ * @throws InterruptedException if sleep is interrupted.
+ */
+ private void verifyEntityForTimelineV2(File entityFile, String expectedEvent,
+ long numOfExpectedEvent, int checkTimes, long sleepTime,
+ boolean checkIdPrefix) throws IOException, InterruptedException {
long actualCount = 0;
- try {
- reader = new BufferedReader(new FileReader(entityFile));
- while ((strLine = reader.readLine()) != null) {
- if (strLine.trim().contains(searchString)) {
- actualCount++;
+ for (int i = 0; i < checkTimes; i++) {
+ BufferedReader reader = null;
+ String strLine = null;
+ actualCount = 0;
+ try {
+ reader = new BufferedReader(new FileReader(entityFile));
+ long idPrefix = -1;
+ while ((strLine = reader.readLine()) != null) {
+ String entityLine = strLine.trim();
+ if (entityLine.isEmpty()) {
+ continue;
+ }
+ if (entityLine.contains(expectedEvent)) {
+ actualCount++;
+ }
+ if (checkIdPrefix) {
+ TimelineEntity entity = FileSystemTimelineReaderImpl.
+ getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
+ Assert.assertTrue("Entity ID prefix expected to be > 0" ,
+ entity.getIdPrefix() > 0);
+ if (idPrefix == -1) {
+ idPrefix = entity.getIdPrefix();
+ } else {
+ Assert.assertEquals("Entity ID prefix should be same across " +
+ "each publish of same entity",
+ idPrefix, entity.getIdPrefix());
+ }
+ }
}
+ } finally {
+ reader.close();
+ }
+ if (numOfExpectedEvent == actualCount) {
+ break;
+ }
+ if (sleepTime > 0 && i < checkTimes - 1) {
+ Thread.sleep(sleepTime);
}
- } finally {
- reader.close();
}
- return actualCount;
+ Assert.assertEquals("Unexpected number of " + expectedEvent +
+ " event published.", numOfExpectedEvent, actualCount);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index ef36ba6..c7880d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -155,6 +155,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -1052,10 +1053,11 @@ public class ContainerManagerImpl extends CompositeService implements
Credentials credentials =
YarnServerSecurityUtils.parseCredentials(launchContext);
+ long containerStartTime = SystemClock.getInstance().getTime();
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
launchContext, credentials, metrics, containerTokenIdentifier,
- context);
+ context, containerStartTime);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerId, container) != null) {
@@ -1112,7 +1114,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
this.context.getNMStateStore().storeContainer(containerId,
- containerTokenIdentifier.getVersion(), request);
+ containerTokenIdentifier.getVersion(), containerStartTime, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
index 0a8ffdf..09c946b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
@@ -23,12 +23,16 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
public class ApplicationContainerFinishedEvent extends ApplicationEvent {
private ContainerStatus containerStatus;
+ // Required by NMTimelinePublisher.
+ private long containerStartTime;
- public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
+ public ApplicationContainerFinishedEvent(ContainerStatus containerStatus,
+ long containerStartTs) {
super(containerStatus.getContainerId().getApplicationAttemptId().
getApplicationId(),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
this.containerStatus = containerStatus;
+ this.containerStartTime = containerStartTs;
}
public ContainerId getContainerID() {
@@ -39,4 +43,7 @@ public class ApplicationContainerFinishedEvent extends ApplicationEvent {
return containerStatus;
}
+ public long getContainerStartTime() {
+ return containerStartTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index f6e567c..ac9fbb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -37,6 +37,8 @@ public interface Container extends EventHandler<ContainerEvent> {
ContainerId getContainerId();
+ long getContainerStartTime();
+
Resource getResource();
ContainerTokenIdentifier getContainerTokenIdentifier();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d6c749/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 6af8653..772b6e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -173,11 +173,11 @@ public class ContainerImpl implements Container {
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
+ private final long startTime;
private static final Logger LOG =
LoggerFactory.getLogger(ContainerImpl.class);
-
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
RecoveredContainerStatus.REQUESTED;
@@ -190,6 +190,16 @@ public class ContainerImpl implements Container {
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context) {
+ this(conf, dispatcher, launchContext, creds, metrics,
+ containerTokenIdentifier, context, SystemClock.getInstance().getTime());
+ }
+
+ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ ContainerLaunchContext launchContext, Credentials creds,
+ NodeManagerMetrics metrics,
+ ContainerTokenIdentifier containerTokenIdentifier, Context context,
+ long startTs) {
+ this.startTime = startTs;
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.stateStore = context.getNMStateStore();
@@ -263,7 +273,7 @@ public class ContainerImpl implements Container {
ContainerTokenIdentifier containerTokenIdentifier, Context context,
RecoveredContainerState rcs) {
this(conf, dispatcher, launchContext, creds, metrics,
- containerTokenIdentifier, context);
+ containerTokenIdentifier, context, rcs.getStartTime());
this.recoveredStatus = rcs.getStatus();
this.exitCode = rcs.getExitCode();
this.recoveredAsKilled = rcs.getKilled();
@@ -631,6 +641,11 @@ public class ContainerImpl implements Container {
}
@Override
+ public long getContainerStartTime() {
+ return this.startTime;
+ }
+
+ @Override
public Resource getResource() {
return Resources.clone(
this.containerTokenIdentifier.getResource());
@@ -694,7 +709,8 @@ public class ContainerImpl implements Container {
EventHandler eventHandler = dispatcher.getEventHandler();
ContainerStatus containerStatus = cloneAndGetContainerStatus();
- eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
+ eventHandler.handle(
+ new ApplicationContainerFinishedEvent(containerStatus, startTime));
// Tell the scheduler the container is Done
eventHandler.handle(new ContainerSchedulerEvent(this,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org