You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/22 01:32:37 UTC
[2/2] hadoop git commit: MAPREDUCE-6327. Made MR AM use timeline
service v2 API to write history events and counters. Contributed by Junping
Du.
MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5eeb2b15
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5eeb2b15
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5eeb2b15
Branch: refs/heads/YARN-2928
Commit: 5eeb2b156f8e108205945f0a1d06873cb51c3527
Parents: ce6aa1b
Author: Zhijie Shen <zj...@apache.org>
Authored: Tue Apr 21 16:31:33 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Tue Apr 21 16:31:33 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 15 ++
.../jobhistory/JobHistoryEventHandler.java | 258 ++++++++++++++++---
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 24 ++
.../v2/app/rm/RMContainerAllocator.java | 9 +
.../hadoop/mapreduce/jobhistory/TestEvents.java | 8 +-
.../jobhistory/TestJobHistoryEventHandler.java | 9 +-
.../apache/hadoop/mapreduce/MRJobConfig.java | 5 +
.../mapreduce/jobhistory/AMStartedEvent.java | 18 ++
.../mapreduce/jobhistory/HistoryEvent.java | 4 +
.../mapreduce/jobhistory/JobFinishedEvent.java | 25 ++
.../jobhistory/JobInfoChangeEvent.java | 11 +
.../mapreduce/jobhistory/JobInitedEvent.java | 14 +
.../jobhistory/JobPriorityChangeEvent.java | 10 +
.../jobhistory/JobQueueChangeEvent.java | 10 +
.../jobhistory/JobStatusChangedEvent.java | 10 +
.../mapreduce/jobhistory/JobSubmittedEvent.java | 23 ++
.../JobUnsuccessfulCompletionEvent.java | 16 ++
.../jobhistory/MapAttemptFinishedEvent.java | 24 +-
.../jobhistory/NormalizedResourceEvent.java | 11 +
.../jobhistory/ReduceAttemptFinishedEvent.java | 25 +-
.../jobhistory/TaskAttemptFinishedEvent.java | 19 ++
.../jobhistory/TaskAttemptStartedEvent.java | 18 ++
.../TaskAttemptUnsuccessfulCompletionEvent.java | 24 ++
.../mapreduce/jobhistory/TaskFailedEvent.java | 19 ++
.../mapreduce/jobhistory/TaskFinishedEvent.java | 18 ++
.../mapreduce/jobhistory/TaskStartedEvent.java | 12 +
.../mapreduce/jobhistory/TaskUpdatedEvent.java | 10 +
.../mapreduce/util/JobHistoryEventUtils.java | 51 ++++
.../src/main/resources/mapred-default.xml | 7 +
.../mapred/TestMRTimelineEventHandling.java | 163 +++++++++++-
.../hadoop/mapreduce/v2/MiniMRYarnCluster.java | 21 +-
31 files changed, 839 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ccdf6d6..1242547 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -1,5 +1,20 @@
Hadoop MapReduce Change Log
+Branch YARN-2928: Timeline Server Next Generation: Phase 1
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history
+ events and counters. (Junping Du via zjshen)
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
Trunk (Unreleased)
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index a0e7041..6d72095 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -49,11 +52,13 @@ import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
@@ -67,10 +72,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.ObjectNode;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The job history events get routed to this class. This class writes the Job
@@ -116,14 +119,24 @@ public class JobHistoryEventHandler extends AbstractService
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
+
+ // For posting entities in new timeline service in a non-blocking way
+ // TODO YARN-3367 replace with event loop in TimelineClient.
+ private static ExecutorService threadPool =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+ .build());
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
protected TimelineClient timelineClient;
+
+ private boolean newTimelineServiceEnabled = false;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
+ private static String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = "MAPREDUCE_TASK_ATTEMPT";
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
@@ -243,13 +256,22 @@ public class JobHistoryEventHandler extends AbstractService
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
+ // TODO replace MR specific configurations on timeline service with getting
+ // configuration from RM through registerApplicationMaster() in
+ // ApplicationMasterProtocol with return value for timeline service
+ // configuration status: off, on_with_v1 or on_with_v2.
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
- timelineClient = TimelineClient.createTimelineClient();
+
+ timelineClient =
+ ((MRAppMaster.RunningAppContext)context).getTimelineClient();
timelineClient.init(conf);
- LOG.info("Timeline service is enabled");
+ newTimelineServiceEnabled = conf.getBoolean(
+ MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
+ MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
+ LOG.info("Timeline service is enabled: " + (newTimelineServiceEnabled? "v2" : "v1"));
LOG.info("Emitting job history data to the timeline server is enabled");
} else {
LOG.info("Timeline service is not enabled");
@@ -409,9 +431,26 @@ public class JobHistoryEventHandler extends AbstractService
if (timelineClient != null) {
timelineClient.stop();
}
+ shutdownAndAwaitTermination();
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}
+
+ // TODO remove threadPool after adding non-blocking call in TimelineClient
+ private static void shutdownAndAwaitTermination() {
+ threadPool.shutdown();
+ try {
+ if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+ threadPool.shutdownNow();
+ if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
+ LOG.error("ThreadPool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ threadPool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
@@ -562,8 +601,13 @@ public class JobHistoryEventHandler extends AbstractService
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
if (timelineClient != null) {
- processEventForTimelineServer(historyEvent, event.getJobID(),
- event.getTimestamp());
+ if (newTimelineServiceEnabled) {
+ processEventForNewTimelineService(historyEvent, event.getJobID(),
+ event.getTimestamp());
+ } else {
+ processEventForTimelineServer(historyEvent, event.getJobID(),
+ event.getTimestamp());
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
@@ -804,11 +848,11 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
- countersToJSON(jfe.getTotalCounters()));
+ JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
- countersToJSON(jfe.getReduceCounters()));
+ JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters()));
tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
- countersToJSON(jfe.getTotalCounters()));
+ JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
@@ -834,7 +878,7 @@ public class JobHistoryEventHandler extends AbstractService
tfe.getFailedAttemptID() == null ?
"" : tfe.getFailedAttemptID().toString());
tEvent.addEventInfo("COUNTERS_GROUPS",
- countersToJSON(tfe.getCounters()));
+ JobHistoryEventUtils.countersToJSON(tfe.getCounters()));
tEntity.addEvent(tEvent);
tEntity.setEntityId(tfe.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@@ -852,7 +896,7 @@ public class JobHistoryEventHandler extends AbstractService
TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
tEvent.addEventInfo("COUNTERS_GROUPS",
- countersToJSON(tfe2.getCounters()));
+ JobHistoryEventUtils.countersToJSON(tfe2.getCounters()));
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
@@ -874,7 +918,6 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("START_TIME", tase.getStartTime());
tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
- tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
"" : tase.getContainerId().toString());
@@ -907,7 +950,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
- countersToJSON(tauce.getCounters()));
+ JobHistoryEventUtils.countersToJSON(tauce.getCounters()));
tEntity.addEvent(tEvent);
tEntity.setEntityId(tauce.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
@@ -921,7 +964,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("STATE", mafe.getState());
tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
- countersToJSON(mafe.getCounters()));
+ JobHistoryEventUtils.countersToJSON(mafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
tEvent.addEventInfo("PORT", mafe.getPort());
tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
@@ -943,7 +986,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
- countersToJSON(rafe.getCounters()));
+ JobHistoryEventUtils.countersToJSON(rafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
tEvent.addEventInfo("PORT", rafe.getPort());
tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
@@ -962,7 +1005,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
tEvent.addEventInfo("STATE", tafe.getState());
tEvent.addEventInfo("COUNTERS_GROUPS",
- countersToJSON(tafe.getCounters()));
+ JobHistoryEventUtils.countersToJSON(tafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tafe.getTaskId().toString());
@@ -988,37 +1031,172 @@ public class JobHistoryEventHandler extends AbstractService
default:
break;
}
-
+
try {
timelineClient.putEntities(tEntity);
- } catch (IOException ex) {
- LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
- + "Server", ex);
- } catch (YarnException ex) {
+ } catch (IOException|YarnException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex);
}
}
-
- @Private
- public JsonNode countersToJSON(Counters counters) {
- ObjectMapper mapper = new ObjectMapper();
- ArrayNode nodes = mapper.createArrayNode();
- if (counters != null) {
- for (CounterGroup counterGroup : counters) {
- ObjectNode groupNode = nodes.addObject();
- groupNode.put("NAME", counterGroup.getName());
- groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
- ArrayNode countersNode = groupNode.putArray("COUNTERS");
- for (Counter counter : counterGroup) {
- ObjectNode counterNode = countersNode.addObject();
- counterNode.put("NAME", counter.getName());
- counterNode.put("DISPLAY_NAME", counter.getDisplayName());
- counterNode.put("VALUE", counter.getValue());
+
+ private void putEntityWithoutBlocking(final TimelineClient timelineClient,
+ final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) {
+ Runnable publishWrapper = new Runnable() {
+ public void run() {
+ try {
+ timelineClient.putEntities(entity);
+ } catch (IOException|YarnException e) {
+ LOG.error("putEntityNonBlocking get failed: " + e);
+ throw new RuntimeException(e.toString());
}
}
+ };
+ threadPool.execute(publishWrapper);
+ }
+
+ // create JobEntity from HistoryEvent with adding other info, like:
+ // jobId, timestamp and entityType.
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+ createJobEntity(HistoryEvent event, long timestamp, JobId jobId,
+ String entityType) {
+
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ createBaseEntity(event, timestamp, entityType);
+ entity.setId(jobId.toString());
+ return entity;
+ }
+
+ // create BaseEntity from HistoryEvent with adding other info, like:
+ // timestamp and entityType.
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+ createBaseEntity(HistoryEvent event, long timestamp, String entityType) {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent =
+ event.toTimelineEvent();
+ tEvent.setTimestamp(timestamp);
+
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
+ entity.addEvent(tEvent);
+ entity.setType(entityType);
+ return entity;
+ }
+
+ // create TaskEntity from HistoryEvent with adding other info, like:
+ // taskId, jobId, timestamp, entityType and relatedJobEntity.
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+ createTaskEntity(HistoryEvent event, long timestamp, String taskId,
+ String entityType, String relatedJobEntity, JobId jobId) {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ createBaseEntity(event, timestamp, entityType);
+ entity.setId(taskId);
+ entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
+ return entity;
+ }
+
+ // create TaskAttemptEntity from HistoryEvent with adding other info, like:
+ // timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId.
+ private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+ createTaskAttemptEntity(HistoryEvent event, long timestamp,
+ String taskAttemptId, String entityType, String relatedTaskEntity,
+ String taskId) {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ createBaseEntity(event, timestamp, entityType);
+ entity.setId(taskAttemptId);
+ entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
+ return entity;
+ }
+
+ private void processEventForNewTimelineService(HistoryEvent event, JobId jobId,
+ long timestamp) {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity = null;
+ String taskId = null;
+ String taskAttemptId = null;
+
+ switch (event.getEventType()) {
+ // Handle job events
+ case JOB_SUBMITTED:
+ case JOB_STATUS_CHANGED:
+ case JOB_INFO_CHANGED:
+ case JOB_INITED:
+ case JOB_PRIORITY_CHANGED:
+ case JOB_QUEUE_CHANGED:
+ case JOB_FAILED:
+ case JOB_KILLED:
+ case JOB_ERROR:
+ case JOB_FINISHED:
+ case AM_STARTED:
+ case NORMALIZED_RESOURCE:
+ break;
+ // Handle task events
+ case TASK_STARTED:
+ taskId = ((TaskStartedEvent)event).getTaskId().toString();
+ break;
+ case TASK_FAILED:
+ taskId = ((TaskFailedEvent)event).getTaskId().toString();
+ break;
+ case TASK_UPDATED:
+ taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
+ break;
+ case TASK_FINISHED:
+ taskId = ((TaskFinishedEvent)event).getTaskId().toString();
+ break;
+ case MAP_ATTEMPT_STARTED:
+ case CLEANUP_ATTEMPT_STARTED:
+ case REDUCE_ATTEMPT_STARTED:
+ case SETUP_ATTEMPT_STARTED:
+ taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
+ taskAttemptId = ((TaskAttemptStartedEvent)event).
+ getTaskAttemptId().toString();
+ break;
+ case MAP_ATTEMPT_FAILED:
+ case CLEANUP_ATTEMPT_FAILED:
+ case REDUCE_ATTEMPT_FAILED:
+ case SETUP_ATTEMPT_FAILED:
+ case MAP_ATTEMPT_KILLED:
+ case CLEANUP_ATTEMPT_KILLED:
+ case REDUCE_ATTEMPT_KILLED:
+ case SETUP_ATTEMPT_KILLED:
+ taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).getTaskId().toString();
+ taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
+ getTaskAttemptId().toString();
+ break;
+ case MAP_ATTEMPT_FINISHED:
+ taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
+ taskAttemptId = ((MapAttemptFinishedEvent)event).getAttemptId().toString();
+ break;
+ case REDUCE_ATTEMPT_FINISHED:
+ taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
+ taskAttemptId = ((ReduceAttemptFinishedEvent)event).getAttemptId().toString();
+ break;
+ case SETUP_ATTEMPT_FINISHED:
+ case CLEANUP_ATTEMPT_FINISHED:
+ taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
+ taskAttemptId = ((TaskAttemptFinishedEvent)event).getAttemptId().toString();
+ break;
+ default:
+ LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
+ " and handled by timeline service.");
+ return;
}
- return nodes;
+ if (taskId == null) {
+ // JobEntity
+ tEntity = createJobEntity(event, timestamp, jobId,
+ MAPREDUCE_JOB_ENTITY_TYPE);
+ } else {
+ if (taskAttemptId == null) {
+ // TaskEntity
+ tEntity = createTaskEntity(event, timestamp, taskId,
+ MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, jobId);
+ } else {
+ // TaskAttemptEntity
+ tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
+ MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
+ taskId);
+ }
+ }
+
+ putEntityWithoutBlocking(timelineClient, tEntity);
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 9908ea5..9416554 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -133,6 +133,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -937,11 +938,29 @@ public class MRAppMaster extends CompositeService {
private final Configuration conf;
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
+ private TimelineClient timelineClient = null;
public RunningAppContext(Configuration config) {
this.conf = config;
this.clientToAMTokenSecretManager =
new ClientToAMTokenSecretManager(appAttemptID, null);
+ if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
+ MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
+ && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+
+ boolean newTimelineServiceEnabled = conf.getBoolean(
+ MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED,
+ MRJobConfig.DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED);
+
+ if (newTimelineServiceEnabled) {
+ // create new version TimelineClient
+ timelineClient = TimelineClient.createTimelineClient(
+ appAttemptID.getApplicationId());
+ } else {
+ timelineClient = TimelineClient.createTimelineClient();
+ }
+ }
}
@Override
@@ -1026,6 +1045,11 @@ public class MRAppMaster extends CompositeService {
public String getNMHostname() {
return nmHost;
}
+
+ // Get Timeline Collector's address (get sync from RM)
+ public TimelineClient getTimelineClient() {
+ return timelineClient;
+ }
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 8cdcaa8..96126a7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -761,6 +762,14 @@ public class RMContainerAllocator extends RMContainerRequestor
computeIgnoreBlacklisting();
handleUpdatedNodes(response);
+ String collectorAddr = response.getCollectorAddr();
+ MRAppMaster.RunningAppContext appContext =
+ (MRAppMaster.RunningAppContext)this.getContext();
+ if (collectorAddr != null && !collectorAddr.isEmpty()
+ && appContext.getTimelineClient() != null) {
+ appContext.getTimelineClient().setTimelineServiceAddress(
+ response.getCollectorAddr());
+ }
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont.getContainerId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
index 00be4b8..51847a9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.junit.Test;
public class TestEvents {
@@ -402,7 +403,12 @@ public class TestEvents {
public void setDatum(Object datum) {
this.datum = datum;
}
-
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ return null;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index de260c9..7bbbb3e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -623,7 +624,7 @@ public class TestJobHistoryEventHandler {
group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
- JsonNode jsonNode = jheh.countersToJSON(counters);
+ JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
+ "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
@@ -646,19 +647,19 @@ public class TestJobHistoryEventHandler {
public void testCountersToJSONEmpty() throws Exception {
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
Counters counters = null;
- JsonNode jsonNode = jheh.countersToJSON(counters);
+ JsonNode jsonNode = JobHistoryEventUtils.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[]";
Assert.assertEquals(expected, jsonStr);
counters = new Counters();
- jsonNode = jheh.countersToJSON(counters);
+ jsonNode = JobHistoryEventUtils.countersToJSON(counters);
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
expected = "[]";
Assert.assertEquals(expected, jsonStr);
counters.addGroup("DOCTORS", "Incarnations of the Doctor");
- jsonNode = jheh.countersToJSON(counters);
+ jsonNode = JobHistoryEventUtils.countersToJSON(counters);
jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
+ "Doctor\",\"COUNTERS\":[]}]";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index e64b9b6..8176efd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -431,6 +431,11 @@ public interface MRJobConfig {
"mapreduce.job.emit-timeline-data";
public static final boolean DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA =
false;
+
+ public static final String MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
+ "mapreduce.job.new-timeline-service.enabled";
+ public static final boolean DEFAULT_MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED =
+ false;
public static final String MR_PREFIX = "yarn.app.mapreduce.";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
index d1a378b..1aae28f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.avro.util.Utf8;
@@ -156,4 +158,20 @@ public class AMStartedEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.AM_STARTED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("APPLICATION_ATTEMPT_ID",
+ getAppAttemptId() == null ? "" : getAppAttemptId().toString());
+ tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
+ "" : getContainerId().toString());
+ tEvent.addInfo("NODE_MANAGER_HOST", getNodeManagerHost());
+ tEvent.addInfo("NODE_MANAGER_PORT", getNodeManagerPort());
+ tEvent.addInfo("NODE_MANAGER_HTTP_PORT", getNodeManagerHttpPort());
+ tEvent.addInfo("START_TIME", getStartTime());
+ return tEvent;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
index a30748c..61ce217 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Interface for event wrapper classes. Implementations each wrap an
@@ -37,4 +38,7 @@ public interface HistoryEvent {
/** Set the Avro datum wrapped by this. */
void setDatum(Object datum);
+
+ /** Map HistoryEvent to TimelineEvent */
+ TimelineEvent toTimelineEvent();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
index e85805c..9eeb9e5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
@@ -23,6 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful completion of job
@@ -132,4 +135,26 @@ public class JobFinishedEvent implements HistoryEvent {
public Counters getReduceCounters() {
return reduceCounters;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("NUM_MAPS", getFinishedMaps());
+ tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
+ tEvent.addInfo("FAILED_MAPS", getFailedMaps());
+ tEvent.addInfo("FAILED_REDUCES", getFailedReduces());
+ tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
+ tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
+ tEvent.addInfo("MAP_COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getMapCounters()));
+ tEvent.addInfo("REDUCE_COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getReduceCounters()));
+ tEvent.addInfo("TOTAL_COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getTotalCounters()));
+ // TODO replace SUCCEEDED with JobState.SUCCEEDED.toString()
+ tEvent.addInfo("JOB_STATUS", "SUCCEEDED");
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
index 12d9ffd..22925f2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -64,5 +66,14 @@ public class JobInfoChangeEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.JOB_INFO_CHANGED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
+ tEvent.addInfo("LAUNCH_TIME", getLaunchTime());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
index ed3ba1c..3f22bdf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -73,4 +75,16 @@ public class JobInitedEvent implements HistoryEvent {
}
/** Get whether the job's map and reduce stages were combined */
public boolean getUberized() { return datum.uberized; }
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("START_TIME", getLaunchTime());
+ tEvent.addInfo("STATUS", getStatus());
+ tEvent.addInfo("TOTAL_MAPS", getTotalMaps());
+ tEvent.addInfo("TOTAL_REDUCES", getTotalReduces());
+ tEvent.addInfo("UBERIZED", getUberized());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
index b95f3a1..1879192 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -62,5 +64,13 @@ public class JobPriorityChangeEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.JOB_PRIORITY_CHANGED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("PRIORITY", getPriority().toString());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
index 86078e6..b9dd359 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobQueueChangeEvent.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@SuppressWarnings("deprecation")
public class JobQueueChangeEvent implements HistoryEvent {
@@ -59,5 +61,13 @@ public class JobQueueChangeEvent implements HistoryEvent {
}
return null;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("QUEUE_NAMES", getJobQueueName());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
index ed169c7..b8aa184 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -60,5 +62,13 @@ public class JobStatusChangedEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.JOB_STATUS_CHANGED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("STATUS", getStatus());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
index 24f820e..b6ef22f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -205,5 +207,26 @@ public class JobSubmittedEvent implements HistoryEvent {
}
/** Get the event type */
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("SUBMIT_TIME", getSubmitTime());
+ tEvent.addInfo("QUEUE_NAME", getJobQueueName());
+ tEvent.addInfo("JOB_NAME", getJobName());
+ tEvent.addInfo("USER_NAME", getUserName());
+ tEvent.addInfo("JOB_CONF_PATH", getJobConfPath());
+ tEvent.addInfo("ACLS", getJobAcls());
+ tEvent.addInfo("JOB_QUEUE_NAME", getJobQueueName());
+ tEvent.addInfo("WORKLFOW_ID", getWorkflowId());
+ tEvent.addInfo("WORKFLOW_NAME", getWorkflowName());
+ tEvent.addInfo("WORKFLOW_NODE_NAME", getWorkflowNodeName());
+ tEvent.addInfo("WORKFLOW_ADJACENCIES",
+ getWorkflowAdjacencies());
+ tEvent.addInfo("WORKFLOW_TAGS", getWorkflowTags());
+
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
index 2d6a68e..86da874 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
@@ -24,6 +24,8 @@ import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import java.util.Collections;
@@ -117,4 +119,18 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
final CharSequence diagnostics = datum.getDiagnostics();
return diagnostics == null ? NODIAGS : diagnostics.toString();
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("NUM_MAPS", getFinishedMaps());
+ tEvent.addInfo("NUM_REDUCES", getFinishedReduces());
+ tEvent.addInfo("JOB_STATUS", getStatus());
+ tEvent.addInfo("DIAGNOSTICS", getDiagnostics());
+ tEvent.addInfo("FINISHED_MAPS", getFinishedMaps());
+ tEvent.addInfo("FINISHED_REDUCES", getFinishedReduces());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
index 62df2aa..36b9bf8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful completion of a map attempt
@@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class MapAttemptFinishedEvent implements HistoryEvent {
+public class MapAttemptFinishedEvent implements HistoryEvent {
private MapAttemptFinished datum = null;
@@ -218,4 +221,23 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
return physMemKbytes;
}
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("STATUS", getTaskStatus());
+ tEvent.addInfo("STATE", getState());
+ tEvent.addInfo("MAP_FINISH_TIME", getMapFinishTime());
+ tEvent.addInfo("COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getCounters()));
+ tEvent.addInfo("HOSTNAME", getHostname());
+ tEvent.addInfo("PORT", getPort());
+ tEvent.addInfo("RACK_NAME", getRackName());
+ tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+ "" : getAttemptId().toString());
+ return tEvent;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
index b8f049c..daa454c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record the normalized map/reduce requirements.
@@ -71,4 +73,13 @@ public class NormalizedResourceEvent implements HistoryEvent {
public void setDatum(Object datum) {
throw new UnsupportedOperationException("Not a seriable object");
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("MEMORY", "" + getMemory());
+ tEvent.addInfo("TASK_TYPE", getTaskType());
+ return tEvent;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
index a779fca..806ca11 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful completion of a reduce attempt
@@ -33,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskType;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class ReduceAttemptFinishedEvent implements HistoryEvent {
+public class ReduceAttemptFinishedEvent implements HistoryEvent {
private ReduceAttemptFinished datum = null;
@@ -222,5 +225,25 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
public int[] getPhysMemKbytes() {
return physMemKbytes;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+ "" : getAttemptId().toString());
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("STATUS", getTaskStatus());
+ tEvent.addInfo("STATE", getState());
+ tEvent.addInfo("SHUFFLE_FINISH_TIME", getShuffleFinishTime());
+ tEvent.addInfo("SORT_FINISH_TIME", getSortFinishTime());
+ tEvent.addInfo("COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getCounters()));
+ tEvent.addInfo("HOSTNAME", getHostname());
+ tEvent.addInfo("PORT", getPort());
+ tEvent.addInfo("RACK_NAME", getRackName());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index 78b9ca9..44a23ba 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record successful task completion
@@ -135,5 +138,21 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
? EventType.MAP_ATTEMPT_FINISHED
: EventType.REDUCE_ATTEMPT_FINISHED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("ATTEMPT_ID", getAttemptId() == null ?
+ "" : getAttemptId().toString());
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("STATUS", getTaskStatus());
+ tEvent.addInfo("STATE", getState());
+ tEvent.addInfo("COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getCounters()));
+ tEvent.addInfo("HOSTNAME", getHostname());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
index 9b408c0..d21ce99 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -129,5 +131,21 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
}
return null;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("TASK_ATTEMPT_ID",
+ getTaskAttemptId().toString());
+ tEvent.addInfo("START_TIME", getStartTime());
+ tEvent.addInfo("HTTP_PORT", getHttpPort());
+ tEvent.addInfo("TRACKER_NAME", getTrackerName());
+ tEvent.addInfo("SHUFFLE_PORT", getShufflePort());
+ tEvent.addInfo("CONTAINER_ID", getContainerId() == null ?
+ "" : getContainerId().toString());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 9b5617c..bee8f39 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.mapred.ProgressSplitsBlock;
@@ -247,5 +250,26 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
public int[] getPhysMemKbytes() {
return physMemKbytes;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("TASK_ATTEMPT_ID", getTaskAttemptId() == null ?
+ "" : getTaskAttemptId().toString());
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("ERROR", getError());
+ tEvent.addInfo("STATUS", getTaskStatus());
+ tEvent.addInfo("HOSTNAME", getHostname());
+ tEvent.addInfo("PORT", getPort());
+ tEvent.addInfo("RACK_NAME", getRackName());
+ tEvent.addInfo("SHUFFLE_FINISH_TIME", getFinishTime());
+ tEvent.addInfo("SORT_FINISH_TIME", getFinishTime());
+ tEvent.addInfo("MAP_FINISH_TIME", getFinishTime());
+ tEvent.addInfo("COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getCounters()));
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
index 0d3aea6..a72ef8f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
@@ -20,10 +20,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -136,5 +140,20 @@ public class TaskFailedEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.TASK_FAILED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("STATUS", TaskStatus.State.FAILED.toString());
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("ERROR", getError());
+ tEvent.addInfo("FAILED_ATTEMPT_ID",
+ getFailedAttemptID() == null ? "" : getFailedAttemptID().toString());
+ tEvent.addInfo("COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getCounters()));
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
index edbf009..b750072 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
@@ -21,10 +21,14 @@ package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record the successful completion of a task
@@ -115,5 +119,19 @@ public class TaskFinishedEvent implements HistoryEvent {
return EventType.TASK_FINISHED;
}
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("COUNTERS_GROUPS",
+ JobHistoryEventUtils.countersToJSON(getCounters()));
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ tEvent.addInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
+ tEvent.addInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
+ getSuccessfulTaskAttemptId() == null ? "" :
+ getSuccessfulTaskAttemptId().toString());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
index 4c2b132..f56b6c3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
/**
* Event to record the start of a task
@@ -67,5 +69,15 @@ public class TaskStartedEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.TASK_STARTED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("TASK_TYPE", getTaskType().toString());
+ tEvent.addInfo("START_TIME", getStartTime());
+ tEvent.addInfo("SPLIT_LOCATIONS", getSplitLocations());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
index 208c5c4..29f1cb4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.avro.util.Utf8;
@@ -58,5 +60,13 @@ public class TaskUpdatedEvent implements HistoryEvent {
public EventType getEventType() {
return EventType.TASK_UPDATED;
}
+
+ @Override
+ public TimelineEvent toTimelineEvent() {
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(StringUtils.toUpperCase(getEventType().name()));
+ tEvent.addInfo("FINISH_TIME", getFinishTime());
+ return tEvent;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
new file mode 100644
index 0000000..f4896ff
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/JobHistoryEventUtils.java
@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.util;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+
+public class JobHistoryEventUtils {
+
+ public static JsonNode countersToJSON(Counters counters) {
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode nodes = mapper.createArrayNode();
+ if (counters != null) {
+ for (CounterGroup counterGroup : counters) {
+ ObjectNode groupNode = nodes.addObject();
+ groupNode.put("NAME", counterGroup.getName());
+ groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
+ ArrayNode countersNode = groupNode.putArray("COUNTERS");
+ for (Counter counter : counterGroup) {
+ ObjectNode counterNode = countersNode.addObject();
+ counterNode.put("NAME", counter.getName());
+ counterNode.put("DISPLAY_NAME", counter.getDisplayName());
+ counterNode.put("VALUE", counter.getValue());
+ }
+ }
+ }
+ return nodes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeb2b15/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 820c1ac..b5031f7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -614,6 +614,13 @@
</description>
</property>
+ <property>
+ <name>mapreduce.job.new-timeline-service.enabled</name>
+ <value>false</value>
+ <description>Specifies if posting job and task events to new timeline service.
+ </description>
+</property>
+
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value>