You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2014/10/28 04:39:19 UTC

git commit: MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server. Contributed by Robert Kanter.

Repository: hadoop
Updated Branches:
  refs/heads/trunk f6b963fdf -> 6b2f11b54


MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server. Contributed by Robert Kanter.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b2f11b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b2f11b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b2f11b5

Branch: refs/heads/trunk
Commit: 6b2f11b54bc679b0715fe66bd129e340e8c61c5c
Parents: f6b963f
Author: Zhijie Shen <zj...@apache.org>
Authored: Mon Oct 27 20:35:40 2014 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Oct 27 20:35:40 2014 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../hadoop-mapreduce-client-app/pom.xml         |   6 +
 .../mapreduce/jobhistory/JobHistoryEvent.java   |   7 +-
 .../jobhistory/JobHistoryEventHandler.java      | 340 ++++++++++++++++++-
 .../jobhistory/TestJobHistoryEventHandler.java  | 248 +++++++++++++-
 .../mapred/TestMRTimelineEventHandling.java     |  88 +++++
 .../hadoop/mapreduce/v2/MiniMRYarnCluster.java  |   7 +-
 7 files changed, 691 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2f11b5/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 84016b9..fd82d49 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -242,6 +242,9 @@ Release 2.6.0 - UNRELEASED
 
   NEW FEATURES
 
+    MAPREDUCE-5933. Enabled MR AM to post history events to the timeline server.
+    (Robert Kanter via zjshen)
+
   IMPROVEMENTS
 
     MAPREDUCE-5971. Move the default options for distcp -p to

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2f11b5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
index 2e597d1..4f6f837 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
@@ -79,6 +79,12 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+   </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2f11b5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
index e853b1c..4fe35b2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEvent.java
@@ -27,7 +27,12 @@ public class JobHistoryEvent extends AbstractEvent<EventType>{
   private final HistoryEvent historyEvent;
 
   public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent) {
-    super(historyEvent.getEventType());
+    this(jobID, historyEvent, System.currentTimeMillis());
+  }
+
+  public JobHistoryEvent(JobId jobID, HistoryEvent historyEvent,
+          long timestamp) {
+    super(historyEvent.getEventType(), timestamp);
     this.jobID = jobID;
     this.historyEvent = historyEvent;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2f11b5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index c566740..ebedc1b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -41,7 +41,9 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -57,8 +59,16 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
 
 /**
  * The job history events get routed to this class. This class writes the Job
@@ -108,6 +118,11 @@ public class JobHistoryEventHandler extends AbstractService
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
+  protected TimelineClient timelineClient;
+
+  private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
+  private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
+
   public JobHistoryEventHandler(AppContext context, int startCount) {
     super("JobHistoryEventHandler");
     this.context = context;
@@ -225,7 +240,10 @@ public class JobHistoryEventHandler extends AbstractService
         conf.getInt(
             MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
             MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
-    
+
+    timelineClient = TimelineClient.createTimelineClient();
+    timelineClient.init(conf);
+
     super.serviceInit(conf);
   }
 
@@ -250,6 +268,7 @@ public class JobHistoryEventHandler extends AbstractService
 
   @Override
   protected void serviceStart() throws Exception {
+    timelineClient.start();
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -372,6 +391,9 @@ public class JobHistoryEventHandler extends AbstractService
         LOG.info("Exception while closing file " + e.getMessage());
       }
     }
+    if (timelineClient != null) {
+      timelineClient.stop();
+    }
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
   }
@@ -515,6 +537,7 @@ public class JobHistoryEventHandler extends AbstractService
       // For all events
       // (1) Write it out
       // (2) Process it for JobSummary
+      // (3) Process it for ATS
       MetaInfo mi = fileMap.get(event.getJobID());
       try {
         HistoryEvent historyEvent = event.getHistoryEvent();
@@ -523,6 +546,8 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
+        processEventForTimelineServer(historyEvent, event.getJobID(),
+                event.getTimestamp());
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
               + event.getHistoryEvent().getEventType());
@@ -667,6 +692,319 @@ public class JobHistoryEventHandler extends AbstractService
     }
   }
 
+  private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
+          long timestamp) {
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(event.getEventType().name().toUpperCase());
+    tEvent.setTimestamp(timestamp);
+    TimelineEntity tEntity = new TimelineEntity();
+
+    switch (event.getEventType()) {
+      case JOB_SUBMITTED:
+        JobSubmittedEvent jse =
+            (JobSubmittedEvent) event;
+        tEvent.addEventInfo("SUBMIT_TIME", jse.getSubmitTime());
+        tEvent.addEventInfo("QUEUE_NAME", jse.getJobQueueName());
+        tEvent.addEventInfo("JOB_NAME", jse.getJobName());
+        tEvent.addEventInfo("USER_NAME", jse.getUserName());
+        tEvent.addEventInfo("JOB_CONF_PATH", jse.getJobConfPath());
+        tEvent.addEventInfo("ACLS", jse.getJobAcls());
+        tEvent.addEventInfo("JOB_QUEUE_NAME", jse.getJobQueueName());
+        tEvent.addEventInfo("WORKLFOW_ID", jse.getWorkflowId());
+        tEvent.addEventInfo("WORKFLOW_NAME", jse.getWorkflowName());
+        tEvent.addEventInfo("WORKFLOW_NAME_NAME", jse.getWorkflowNodeName());
+        tEvent.addEventInfo("WORKFLOW_ADJACENCIES",
+                jse.getWorkflowAdjacencies());
+        tEvent.addEventInfo("WORKFLOW_TAGS", jse.getWorkflowTags());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_STATUS_CHANGED:
+        JobStatusChangedEvent jsce = (JobStatusChangedEvent) event;
+        tEvent.addEventInfo("STATUS", jsce.getStatus());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_INFO_CHANGED:
+        JobInfoChangeEvent jice = (JobInfoChangeEvent) event;
+        tEvent.addEventInfo("SUBMIT_TIME", jice.getSubmitTime());
+        tEvent.addEventInfo("LAUNCH_TIME", jice.getLaunchTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_INITED:
+        JobInitedEvent jie = (JobInitedEvent) event;
+        tEvent.addEventInfo("START_TIME", jie.getLaunchTime());
+        tEvent.addEventInfo("STATUS", jie.getStatus());
+        tEvent.addEventInfo("TOTAL_MAPS", jie.getTotalMaps());
+        tEvent.addEventInfo("TOTAL_REDUCES", jie.getTotalReduces());
+        tEvent.addEventInfo("UBERIZED", jie.getUberized());
+        tEntity.setStartTime(jie.getLaunchTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_PRIORITY_CHANGED:
+        JobPriorityChangeEvent jpce = (JobPriorityChangeEvent) event;
+        tEvent.addEventInfo("PRIORITY", jpce.getPriority().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_QUEUE_CHANGED:
+        JobQueueChangeEvent jqe = (JobQueueChangeEvent) event;
+        tEvent.addEventInfo("QUEUE_NAMES", jqe.getJobQueueName());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_FAILED:
+      case JOB_KILLED:
+      case JOB_ERROR:
+        JobUnsuccessfulCompletionEvent juce =
+              (JobUnsuccessfulCompletionEvent) event;
+        tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime());
+        tEvent.addEventInfo("NUM_MAPS", juce.getFinishedMaps());
+        tEvent.addEventInfo("NUM_REDUCES", juce.getFinishedReduces());
+        tEvent.addEventInfo("JOB_STATUS", juce.getStatus());
+        tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics());
+        tEvent.addEventInfo("FINISHED_MAPS", juce.getFinishedMaps());
+        tEvent.addEventInfo("FINISHED_REDUCES", juce.getFinishedReduces());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case JOB_FINISHED:
+        JobFinishedEvent jfe = (JobFinishedEvent) event;
+        tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime());
+        tEvent.addEventInfo("NUM_MAPS", jfe.getFinishedMaps());
+        tEvent.addEventInfo("NUM_REDUCES", jfe.getFinishedReduces());
+        tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps());
+        tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces());
+        tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
+        tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
+        tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
+                countersToJSON(jfe.getTotalCounters()));
+        tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
+                countersToJSON(jfe.getReduceCounters()));
+        tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
+                countersToJSON(jfe.getTotalCounters()));
+        tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      case TASK_STARTED:
+        TaskStartedEvent tse = (TaskStartedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tse.getTaskType().toString());
+        tEvent.addEventInfo("START_TIME", tse.getStartTime());
+        tEvent.addEventInfo("SPLIT_LOCATIONS", tse.getSplitLocations());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tse.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case TASK_FAILED:
+        TaskFailedEvent tfe = (TaskFailedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tfe.getTaskType().toString());
+        tEvent.addEventInfo("STATUS", TaskStatus.State.FAILED.toString());
+        tEvent.addEventInfo("FINISH_TIME", tfe.getFinishTime());
+        tEvent.addEventInfo("ERROR", tfe.getError());
+        tEvent.addEventInfo("FAILED_ATTEMPT_ID",
+                tfe.getFailedAttemptID() == null ?
+                "" : tfe.getFailedAttemptID().toString());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tfe.getCounters()));
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tfe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case TASK_UPDATED:
+        TaskUpdatedEvent tue = (TaskUpdatedEvent) event;
+        tEvent.addEventInfo("FINISH_TIME", tue.getFinishTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tue.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case TASK_FINISHED:
+        TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tfe2.getCounters()));
+        tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
+        tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
+        tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
+                tfe2.getSuccessfulTaskAttemptId() == null ?
+                "" : tfe2.getSuccessfulTaskAttemptId().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tfe2.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case MAP_ATTEMPT_STARTED:
+      case CLEANUP_ATTEMPT_STARTED:
+      case REDUCE_ATTEMPT_STARTED:
+      case SETUP_ATTEMPT_STARTED:
+        TaskAttemptStartedEvent tase = (TaskAttemptStartedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
+        tEvent.addEventInfo("TASK_ATTEMPT_ID",
+            tase.getTaskAttemptId().toString() == null ?
+            "" : tase.getTaskAttemptId().toString());
+        tEvent.addEventInfo("START_TIME", tase.getStartTime());
+        tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
+        tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
+        tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
+        tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
+        tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
+            "" : tase.getContainerId().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tase.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case MAP_ATTEMPT_FAILED:
+      case CLEANUP_ATTEMPT_FAILED:
+      case REDUCE_ATTEMPT_FAILED:
+      case SETUP_ATTEMPT_FAILED:
+      case MAP_ATTEMPT_KILLED:
+      case CLEANUP_ATTEMPT_KILLED:
+      case REDUCE_ATTEMPT_KILLED:
+      case SETUP_ATTEMPT_KILLED:
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+                (TaskAttemptUnsuccessfulCompletionEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tauce.getTaskType().toString());
+        tEvent.addEventInfo("TASK_ATTEMPT_ID",
+            tauce.getTaskAttemptId() == null ?
+            "" : tauce.getTaskAttemptId().toString());
+        tEvent.addEventInfo("FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("ERROR", tauce.getError());
+        tEvent.addEventInfo("STATUS", tauce.getTaskStatus());
+        tEvent.addEventInfo("HOSTNAME", tauce.getHostname());
+        tEvent.addEventInfo("PORT", tauce.getPort());
+        tEvent.addEventInfo("RACK_NAME", tauce.getRackName());
+        tEvent.addEventInfo("SHUFFLE_FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tauce.getCounters()));
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tauce.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case MAP_ATTEMPT_FINISHED:
+        MapAttemptFinishedEvent mafe = (MapAttemptFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", mafe.getTaskType().toString());
+        tEvent.addEventInfo("FINISH_TIME", mafe.getFinishTime());
+        tEvent.addEventInfo("STATUS", mafe.getTaskStatus());
+        tEvent.addEventInfo("STATE", mafe.getState());
+        tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(mafe.getCounters()));
+        tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
+        tEvent.addEventInfo("PORT", mafe.getPort());
+        tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
+        tEvent.addEventInfo("ATTEMPT_ID", mafe.getAttemptId() == null ?
+            "" : mafe.getAttemptId().toString());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(mafe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case REDUCE_ATTEMPT_FINISHED:
+        ReduceAttemptFinishedEvent rafe = (ReduceAttemptFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", rafe.getTaskType().toString());
+        tEvent.addEventInfo("ATTEMPT_ID", rafe.getAttemptId() == null ?
+            "" : rafe.getAttemptId().toString());
+        tEvent.addEventInfo("FINISH_TIME", rafe.getFinishTime());
+        tEvent.addEventInfo("STATUS", rafe.getTaskStatus());
+        tEvent.addEventInfo("STATE", rafe.getState());
+        tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
+        tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(rafe.getCounters()));
+        tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
+        tEvent.addEventInfo("PORT", rafe.getPort());
+        tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(rafe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case SETUP_ATTEMPT_FINISHED:
+      case CLEANUP_ATTEMPT_FINISHED:
+        TaskAttemptFinishedEvent tafe = (TaskAttemptFinishedEvent) event;
+        tEvent.addEventInfo("TASK_TYPE", tafe.getTaskType().toString());
+        tEvent.addEventInfo("ATTEMPT_ID", tafe.getAttemptId() == null ?
+            "" : tafe.getAttemptId().toString());
+        tEvent.addEventInfo("FINISH_TIME", tafe.getFinishTime());
+        tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
+        tEvent.addEventInfo("STATE", tafe.getState());
+        tEvent.addEventInfo("COUNTERS_GROUPS",
+                countersToJSON(tafe.getCounters()));
+        tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(tafe.getTaskId().toString());
+        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
+        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
+        break;
+      case AM_STARTED:
+        AMStartedEvent ase = (AMStartedEvent) event;
+        tEvent.addEventInfo("APPLICATION_ATTEMPT_ID",
+                ase.getAppAttemptId() == null ?
+                "" : ase.getAppAttemptId().toString());
+        tEvent.addEventInfo("CONTAINER_ID", ase.getContainerId() == null ?
+                "" : ase.getContainerId().toString());
+        tEvent.addEventInfo("NODE_MANAGER_HOST", ase.getNodeManagerHost());
+        tEvent.addEventInfo("NODE_MANAGER_PORT", ase.getNodeManagerPort());
+        tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
+                ase.getNodeManagerHttpPort());
+        tEvent.addEventInfo("START_TIME", ase.getStartTime());
+        tEntity.addEvent(tEvent);
+        tEntity.setEntityId(jobId.toString());
+        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
+        break;
+      default:
+        break;
+    }
+
+    try {
+      timelineClient.putEntities(tEntity);
+    } catch (IOException ex) {
+      LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+      + "Server", ex);
+    } catch (YarnException ex) {
+      LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+      + "Server", ex);
+    }
+  }
+
+  @Private
+  public JsonNode countersToJSON(Counters counters) {
+    ObjectMapper mapper = new ObjectMapper();
+    ArrayNode nodes = mapper.createArrayNode();
+    if (counters != null) {
+      for (CounterGroup counterGroup : counters) {
+        ObjectNode groupNode = nodes.addObject();
+        groupNode.put("NAME", counterGroup.getName());
+        groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
+        ArrayNode countersNode = groupNode.putArray("COUNTERS");
+        for (Counter counter : counterGroup) {
+          ObjectNode counterNode = countersNode.addObject();
+          counterNode.put("NAME", counter.getName());
+          counterNode.put("DISPLAY_NAME", counter.getDisplayName());
+          counterNode.put("VALUE", counter.getValue());
+        }
+      }
+    }
+    return nodes;
+  }
+
   private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
 
     Counter slotMillisMapCounter = allCounters

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2f11b5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 7539e73..48042d3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.HashMap;
 
 import org.junit.Assert;
 
@@ -42,9 +43,12 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -55,14 +59,22 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.junit.After;
 import org.junit.AfterClass;
 import static org.junit.Assert.assertFalse;
 import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -126,7 +138,7 @@ public class TestJobHistoryEventHandler {
 
       // First completion event, but min-queue-size for batching flushes is 10
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-          t.taskID, null, 0, TaskType.MAP, "", null)));
+          t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       verify(mockWriter).flush();
 
     } finally {
@@ -162,7 +174,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, null, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       }
 
       handleNextNEvents(jheh, 9);
@@ -207,7 +219,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, null, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       }
 
       handleNextNEvents(jheh, 9);
@@ -248,7 +260,7 @@ public class TestJobHistoryEventHandler {
 
       for (int i = 0 ; i < 100 ; i++) {
         queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
-            t.taskID, null, 0, TaskType.MAP, "", null)));
+            t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null)));
       }
       queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
           TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
@@ -427,6 +439,231 @@ public class TestJobHistoryEventHandler {
         pathStr);
   }
 
+  // Have JobHistoryEventHandler handle some events and make sure they get
+  // stored to the Timeline store
+  @Test (timeout=50000)
+  public void testTimelineEventHandling() throws Exception {
+    TestParams t = new TestParams(false);
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    JHEvenHandlerForTest jheh = new JHEvenHandlerForTest(t.mockAppContext, 0);
+    jheh.init(conf);
+    MiniYARNCluster yarnCluster = null;
+    long currentTime = System.currentTimeMillis();
+    try {
+      yarnCluster = new MiniYARNCluster(
+            TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1, true);
+      yarnCluster.init(conf);
+      yarnCluster.start();
+      jheh.start();
+      TimelineStore ts = yarnCluster.getApplicationHistoryServer()
+              .getTimelineStore();
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+              t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
+              currentTime - 10));
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+              null, null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(1, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(0).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+              new JobSubmittedEvent(TypeConverter.fromYarn(t.jobId), "name",
+              "user", 200, "/foo/job.xml",
+              new HashMap<JobACL, AccessControlList>(), "default"),
+              currentTime + 10));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(2, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(1).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+              new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
+              currentTime - 20));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(3, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
+              tEntity.getEvents().get(2).getEventType());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(1).getTimestamp());
+      Assert.assertEquals(currentTime - 20,
+              tEntity.getEvents().get(2).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+              new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
+              0, new Counters(), new Counters(), new Counters()), currentTime));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(4, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.JOB_FINISHED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(2).getEventType());
+      Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
+              tEntity.getEvents().get(3).getEventType());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime,
+              tEntity.getEvents().get(1).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(2).getTimestamp());
+      Assert.assertEquals(currentTime - 20,
+              tEntity.getEvents().get(3).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+            new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
+            0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
+      Assert.assertEquals(5, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.JOB_KILLED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(EventType.JOB_SUBMITTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(EventType.JOB_FINISHED.toString(),
+              tEntity.getEvents().get(2).getEventType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(3).getEventType());
+      Assert.assertEquals(EventType.JOB_QUEUE_CHANGED.toString(),
+              tEntity.getEvents().get(4).getEventType());
+      Assert.assertEquals(currentTime + 20,
+              tEntity.getEvents().get(0).getTimestamp());
+      Assert.assertEquals(currentTime + 10,
+              tEntity.getEvents().get(1).getTimestamp());
+      Assert.assertEquals(currentTime,
+              tEntity.getEvents().get(2).getTimestamp());
+      Assert.assertEquals(currentTime - 10,
+              tEntity.getEvents().get(3).getTimestamp());
+      Assert.assertEquals(currentTime - 20,
+              tEntity.getEvents().get(4).getTimestamp());
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+            new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
+      entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
+      Assert.assertEquals(1, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.TASK_STARTED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+      Assert.assertEquals(TaskType.MAP.toString(),
+              tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+            new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
+      entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
+              null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
+      Assert.assertEquals(2, tEntity.getEvents().size());
+      Assert.assertEquals(EventType.TASK_STARTED.toString(),
+              tEntity.getEvents().get(1).getEventType());
+      Assert.assertEquals(TaskType.REDUCE.toString(),
+              tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
+      Assert.assertEquals(TaskType.MAP.toString(),
+              tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
+    } finally {
+      if (yarnCluster != null) {
+        yarnCluster.stop();
+      }
+    }
+  }
+
+  @Test (timeout=50000)
+  public void testCountersToJSON() throws Exception {
+    JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
+    Counters counters = new Counters();
+    CounterGroup group1 = counters.addGroup("DOCTORS",
+            "Incarnations of the Doctor");
+    group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12);
+    group1.addCounter("MATT_SMITH", "Matt Smith", 11);
+    group1.addCounter("DAVID_TENNANT", "David Tennant", 10);
+    CounterGroup group2 = counters.addGroup("COMPANIONS",
+            "Companions of the Doctor");
+    group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6);
+    group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5);
+    group2.addCounter("AMY_POND", "Amy Pond", 4);
+    group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
+    group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
+    group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
+    JsonNode jsonNode = jheh.countersToJSON(counters);
+    String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
+        + "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
+        + ":\"Amy Pond\",\"VALUE\":4},{\"NAME\":\"CLARA_OSWALD\","
+        + "\"DISPLAY_NAME\":\"Clara Oswald\",\"VALUE\":6},{\"NAME\":"
+        + "\"DONNA_NOBLE\",\"DISPLAY_NAME\":\"Donna Noble\",\"VALUE\":2},"
+        + "{\"NAME\":\"MARTHA_JONES\",\"DISPLAY_NAME\":\"Martha Jones\","
+        + "\"VALUE\":3},{\"NAME\":\"RORY_WILLIAMS\",\"DISPLAY_NAME\":\"Rory "
+        + "Williams\",\"VALUE\":5},{\"NAME\":\"ROSE_TYLER\",\"DISPLAY_NAME\":"
+        + "\"Rose Tyler\",\"VALUE\":1}]},{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\""
+        + ":\"Incarnations of the Doctor\",\"COUNTERS\":[{\"NAME\":"
+        + "\"DAVID_TENNANT\",\"DISPLAY_NAME\":\"David Tennant\",\"VALUE\":10},"
+        + "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":"
+        + "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\","
+        + "\"VALUE\":12}]}]";
+    Assert.assertEquals(expected, jsonStr);
+  }
+
+  @Test (timeout=50000)
+  public void testCountersToJSONEmpty() throws Exception {
+    JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
+    Counters counters = null;
+    JsonNode jsonNode = jheh.countersToJSON(counters);
+    String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    String expected = "[]";
+    Assert.assertEquals(expected, jsonStr);
+
+    counters = new Counters();
+    jsonNode = jheh.countersToJSON(counters);
+    jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    expected = "[]";
+    Assert.assertEquals(expected, jsonStr);
+
+    counters.addGroup("DOCTORS", "Incarnations of the Doctor");
+    jsonNode = jheh.countersToJSON(counters);
+    jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
+    expected = "[{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\":\"Incarnations of the "
+        + "Doctor\",\"COUNTERS\":[]}]";
+    Assert.assertEquals(expected, jsonStr);
+  }
+
   private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
     jheh.handle(event);
   }
@@ -480,6 +717,7 @@ public class TestJobHistoryEventHandler {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
     TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
+    TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, 0);
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     AppContext mockAppContext;
 
@@ -557,11 +795,13 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
   private boolean mockHistoryProcessing = true;
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
+    JobHistoryEventHandler.fileMap.clear();
   }
 
   public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
     super(context, startCount);
     this.mockHistoryProcessing = mockHistoryProcessing;
+    JobHistoryEventHandler.fileMap.clear();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2f11b5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
new file mode 100644
index 0000000..2352818
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMRTimelineEventHandling {
+
+  @Test
+  public void testMRTimelineEventHandling() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    MiniMRYarnCluster cluster = null;
+    try {
+      cluster = new MiniMRYarnCluster(
+              TestJobHistoryEventHandler.class.getSimpleName(), 1, true);
+      cluster.init(conf);
+      cluster.start();
+      TimelineStore ts = cluster.getApplicationHistoryServer()
+              .getTimelineStore();
+
+      Path inDir = new Path("input");
+      Path outDir = new Path("output");
+      RunningJob job =
+              UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.SUCCEEDED,
+              job.getJobStatus().getState().getValue());
+      TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
+              null, null, null, null, null, null);
+      Assert.assertEquals(1, entities.getEntities().size());
+      TimelineEntity tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+      Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(tEntity.getEvents().size() - 1)
+              .getEventType());
+      Assert.assertEquals(EventType.JOB_FINISHED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+
+      job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
+      Assert.assertEquals(JobStatus.FAILED,
+              job.getJobStatus().getState().getValue());
+      entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
+              null, null, null);
+      Assert.assertEquals(2, entities.getEntities().size());
+      tEntity = entities.getEntities().get(0);
+      Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
+      Assert.assertEquals("MAPREDUCE_JOB", tEntity.getEntityType());
+      Assert.assertEquals(EventType.AM_STARTED.toString(),
+              tEntity.getEvents().get(tEntity.getEvents().size() - 1)
+              .getEventType());
+      Assert.assertEquals(EventType.JOB_FAILED.toString(),
+              tEntity.getEvents().get(0).getEventType());
+    } finally {
+      if (cluster != null) {
+        cluster.stop();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b2f11b5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
index 81b8b16..47b38a1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
@@ -72,8 +72,11 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
   }
 
   public MiniMRYarnCluster(String testName, int noOfNMs) {
-    super(testName, noOfNMs, 4, 4);
-    //TODO: add the history server
+    this(testName, noOfNMs, false);
+  }
+
+  public MiniMRYarnCluster(String testName, int noOfNMs, boolean enableAHS) {
+    super(testName, 1, noOfNMs, 4, 4, enableAHS);
     historyServerWrapper = new JobHistoryServerWrapper();
     addService(historyServerWrapper);
   }