You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2014/02/26 22:45:58 UTC

svn commit: r1572279 - in /hadoop/common/branches/branch-2.4/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...

Author: jlowe
Date: Wed Feb 26 21:45:58 2014
New Revision: 1572279

URL: http://svn.apache.org/r1572279
Log:
svn merge -c 1572269 FIXES: Preserve Job diagnostics in history. Contributed by Gera Shegalov

Added:
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist
      - copied unchanged from r1572269, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist
Modified:
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
    hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/CHANGES.txt Wed Feb 26 21:45:58 2014
@@ -19,6 +19,9 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle
     is on in the shuffle-handler. (Jian He via vinodkv) 
 
+    MAPREDUCE-5754. Preserve Job diagnostics in history (Gera Shegalov via
+    jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Wed Feb 26 21:45:58 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TypeC
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
@@ -343,11 +344,12 @@ public class JobHistoryEventHandler exte
           LOG.warn("Found jobId " + toClose
             + " to have not been closed. Will close");
           //Create a JobFinishEvent so that it is written to the job history
+          final Job job = context.getJob(toClose);
           JobUnsuccessfulCompletionEvent jucEvent =
             new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
-              System.currentTimeMillis(), context.getJob(toClose)
-              .getCompletedMaps(), context.getJob(toClose).getCompletedReduces(),
-              JobState.KILLED.toString());
+                System.currentTimeMillis(), job.getCompletedMaps(),
+                job.getCompletedReduces(), JobState.KILLED.toString(),
+                job.getDiagnostics());
           JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
           //Bypass the queue mechanism which might wait. Call the method directly
           handleEvent(jfEvent);

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Wed Feb 26 21:45:58 2014
@@ -149,6 +149,9 @@ public class JobImpl implements org.apac
 
   // Maximum no. of fetch-failure notifications after which map task is failed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+
+  public static final String JOB_KILLED_DIAG =
+      "Job received Kill while in RUNNING state.";
   
   //final fields
   private final ApplicationAttemptId applicationAttemptId;
@@ -1617,7 +1620,8 @@ public class JobImpl implements org.apac
               finishTime,
               succeededMapTaskCount,
               succeededReduceTaskCount,
-              finalState.toString());
+              finalState.toString(),
+              diagnostics);
       eventHandler.handle(new JobHistoryEvent(jobId,
           unsuccessfulJobEvent));
       finished(finalState);
@@ -1730,7 +1734,7 @@ public class JobImpl implements org.apac
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobStateInternal.KILLED.toString());
+              JobStateInternal.KILLED.toString(), job.diagnostics);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
       job.finished(JobStateInternal.KILLED);
     }
@@ -1763,7 +1767,7 @@ public class JobImpl implements org.apac
       implements SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
-      job.addDiagnostic("Job received Kill while in RUNNING state.");
+      job.addDiagnostic(JOB_KILLED_DIAG);
       for (Task task : job.tasks.values()) {
         job.eventHandler.handle(
             new TaskEvent(task.getID(), TaskEventType.T_KILL));
@@ -2127,7 +2131,7 @@ public class JobImpl implements org.apac
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              jobHistoryString);
+              jobHistoryString, job.diagnostics);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
       job.finished(terminationState);
     }

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java Wed Feb 26 21:45:58 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.junit.Test;
 
 public class TestEvents {
@@ -334,11 +335,12 @@ public class TestEvents {
   private FakeEvent getJobKilledEvent() {
     FakeEvent result = new FakeEvent(EventType.JOB_KILLED);
     JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion();
-    datum.finishedMaps = 1;
-    datum.finishedReduces = 2;
-    datum.finishTime = 3;
-    datum.jobid = "ID";
-    datum.jobStatus = "STATUS";
+    datum.setFinishedMaps(1);
+    datum.setFinishedReduces(2);
+    datum.setFinishTime(3L);
+    datum.setJobid("ID");
+    datum.setJobStatus("STATUS");
+    datum.setDiagnostics(JobImpl.JOB_KILLED_DIAG);
     result.setDatum(datum);
     return result;
   }

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Wed Feb 26 21:45:58 2014
@@ -135,7 +135,8 @@
           {"name": "finishTime", "type": "long"},
           {"name": "finishedMaps", "type": "int"},
           {"name": "finishedReduces", "type": "int"},
-          {"name": "jobStatus", "type": "string"}
+          {"name": "jobStatus", "type": "string"},
+          {"name": "diagnostics", "type": "string"}
       ]
      },
 

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Wed Feb 26 21:45:58 2014
@@ -353,10 +353,6 @@ public class JobHistoryParser implements
     taskInfo.error = StringInterner.weakIntern(event.getError());
     taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
     taskInfo.counters = event.getCounters();
-    if (info.errorInfo.isEmpty()) {
-      info.errorInfo = "Task " + taskInfo.taskId + " failed " +
-          taskInfo.attemptsMap.size() + " times ";
-    }
   }
 
   private void handleTaskStartedEvent(TaskStartedEvent event) {
@@ -373,6 +369,7 @@ public class JobHistoryParser implements
     info.finishedMaps = event.getFinishedMaps();
     info.finishedReduces = event.getFinishedReduces();
     info.jobStatus = StringInterner.weakIntern(event.getStatus());
+    info.errorInfo = StringInterner.weakIntern(event.getDiagnostics());
   }
 
   private void handleJobFinishedEvent(JobFinishedEvent event) {

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java Wed Feb 26 21:45:58 2014
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import com.google.common.base.Joiner;
+
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
 
+import java.util.Collections;
+
 /**
  * Event to record Failed and Killed completion of jobs
  *
@@ -30,6 +34,10 @@ import org.apache.hadoop.mapreduce.JobID
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
+  private static final String NODIAGS = "";
+  private static final Iterable<String> NODIAGS_LIST =
+      Collections.singletonList(NODIAGS);
+
   private JobUnsuccessfulCompletion datum
     = new JobUnsuccessfulCompletion();
 
@@ -44,11 +52,33 @@ public class JobUnsuccessfulCompletionEv
   public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
       int finishedMaps,
       int finishedReduces, String status) {
-    datum.jobid = new Utf8(id.toString());
-    datum.finishTime = finishTime;
-    datum.finishedMaps = finishedMaps;
-    datum.finishedReduces = finishedReduces;
-    datum.jobStatus = new Utf8(status);
+    this(id, finishTime, finishedMaps, finishedReduces, status, NODIAGS_LIST);
+  }
+
+  /**
+   * Create an event to record unsuccessful completion (killed/failed) of jobs
+   * @param id Job ID
+   * @param finishTime Finish time of the job
+   * @param finishedMaps Number of finished maps
+   * @param finishedReduces Number of finished reduces
+   * @param status Status of the job
+   * @param diagnostics job runtime diagnostics
+   */
+  public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
+      int finishedMaps,
+      int finishedReduces,
+      String status,
+      Iterable<String> diagnostics) {
+    datum.setJobid(new Utf8(id.toString()));
+    datum.setFinishTime(finishTime);
+    datum.setFinishedMaps(finishedMaps);
+    datum.setFinishedReduces(finishedReduces);
+    datum.setJobStatus(new Utf8(status));
+    if (diagnostics == null) {
+      diagnostics = NODIAGS_LIST;
+    }
+    datum.setDiagnostics(new Utf8(Joiner.on('\n').skipNulls()
+        .join(diagnostics)));
   }
 
   JobUnsuccessfulCompletionEvent() {}
@@ -61,13 +91,13 @@ public class JobUnsuccessfulCompletionEv
   /** Get the Job ID */
   public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the job finish time */
-  public long getFinishTime() { return datum.finishTime; }
+  public long getFinishTime() { return datum.getFinishTime(); }
   /** Get the number of finished maps */
-  public int getFinishedMaps() { return datum.finishedMaps; }
+  public int getFinishedMaps() { return datum.getFinishedMaps(); }
   /** Get the number of finished reduces */
-  public int getFinishedReduces() { return datum.finishedReduces; }
+  public int getFinishedReduces() { return datum.getFinishedReduces(); }
   /** Get the status */
-  public String getStatus() { return datum.jobStatus.toString(); }
+  public String getStatus() { return datum.getJobStatus().toString(); }
   /** Get the event type */
   public EventType getEventType() {
     if ("FAILED".equals(getStatus())) {
@@ -78,4 +108,13 @@ public class JobUnsuccessfulCompletionEv
       return EventType.JOB_KILLED;
   }
 
+  /**
+   * Retrieves diagnostics information preserved in the history file
+   *
+   * @return diagnostics as of the time of job termination
+   */
+  public String getDiagnostics() {
+    final CharSequence diagnostics = datum.getDiagnostics();
+    return diagnostics == null ? NODIAGS : diagnostics.toString();
+  }
 }

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml Wed Feb 26 21:45:58 2014
@@ -73,7 +73,7 @@
         <configuration>
           <excludes>
             <exclude>src/test/resources/job_1329348432655_0001_conf.xml</exclude>
-            <exclude>src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist</exclude>
+            <exclude>src/test/resources/*.jhist</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1572279&r1=1572278&r2=1572279&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Wed Feb 26 21:45:58 2014
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
-import static junit.framework.Assert.assertEquals;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
+    .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -26,6 +29,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,9 +41,9 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
@@ -54,6 +58,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
@@ -67,8 +72,11 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
@@ -150,7 +158,7 @@ public class TestJobHistoryParsing {
     conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
     long amStartTimeEst = System.currentTimeMillis();
     conf.setClass(
-        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
         MyResolver.class, DNSToSwitchMapping.class);
     RackResolver.init(conf);
     MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
@@ -391,7 +399,7 @@ public class TestJobHistoryParsing {
     try {
       Configuration conf = new Configuration();
       conf.setClass(
-          CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+          NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
           MyResolver.class, DNSToSwitchMapping.class);
       RackResolver.init(conf);
       MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
@@ -456,7 +464,7 @@ public class TestJobHistoryParsing {
     try {
       Configuration conf = new Configuration();
       conf.setClass(
-          CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+          NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
           MyResolver.class, DNSToSwitchMapping.class);
       RackResolver.init(conf);
       MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
@@ -500,18 +508,85 @@ public class TestJobHistoryParsing {
         Assert.assertNotNull("completed task report has null counters", ct
             .getReport().getCounters());
       }
+      final List<String> originalDiagnostics = job.getDiagnostics();
+      final String historyError = jobInfo.getErrorInfo();
+      assertTrue("No original diagnostics for a failed job",
+          originalDiagnostics != null && !originalDiagnostics.isEmpty());
+      assertNotNull("No history error info for a failed job ", historyError);
+      for (String diagString : originalDiagnostics) {
+        assertTrue(historyError.contains(diagString));
+      }
     } finally {
       LOG.info("FINISHED testCountersForFailedTask");
     }
   }
 
+  @Test(timeout = 60000)
+  public void testDiagnosticsForKilledJob() throws Exception {
+    LOG.info("STARTING testDiagnosticsForKilledJob");
+    try {
+      final Configuration conf = new Configuration();
+      conf.setClass(
+          NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+          MyResolver.class, DNSToSwitchMapping.class);
+      RackResolver.init(conf);
+      MRApp app = new MRAppWithHistoryWithJobKilled(2, 1, true, this
+          .getClass().getName(), true);
+      app.submit(conf);
+      Job job = app.getContext().getAllJobs().values().iterator().next();
+      JobId jobId = job.getID();
+      app.waitForState(job, JobState.KILLED);
+
+      // make sure all events are flushed
+      app.waitForState(Service.STATE.STOPPED);
+
+      JobHistory jobHistory = new JobHistory();
+      jobHistory.init(conf);
+
+      HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+
+      JobHistoryParser parser;
+      JobInfo jobInfo;
+      synchronized (fileInfo) {
+        Path historyFilePath = fileInfo.getHistoryFile();
+        FSDataInputStream in = null;
+        FileContext fc = null;
+        try {
+          fc = FileContext.getFileContext(conf);
+          in = fc.open(fc.makeQualified(historyFilePath));
+        } catch (IOException ioe) {
+          LOG.info("Can not open history file: " + historyFilePath, ioe);
+          throw (new Exception("Can not open History File"));
+        }
+
+        parser = new JobHistoryParser(in);
+        jobInfo = parser.parse();
+      }
+      Exception parseException = parser.getParseException();
+      assertNull("Caught an expected exception " + parseException,
+          parseException);
+      final List<String> originalDiagnostics = job.getDiagnostics();
+      final String historyError = jobInfo.getErrorInfo();
+      assertTrue("No original diagnostics for a failed job",
+          originalDiagnostics != null && !originalDiagnostics.isEmpty());
+      assertNotNull("No history error info for a failed job ", historyError);
+      for (String diagString : originalDiagnostics) {
+        assertTrue(historyError.contains(diagString));
+      }
+      assertTrue("No killed message in diagnostics",
+        historyError.contains(JobImpl.JOB_KILLED_DIAG));
+    } finally {
+      LOG.info("FINISHED testDiagnosticsForKilledJob");
+    }
+  }
+
   @Test(timeout = 50000)
   public void testScanningOldDirs() throws Exception {
     LOG.info("STARTING testScanningOldDirs");
     try {
       Configuration conf = new Configuration();
       conf.setClass(
-          CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+          NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
           MyResolver.class, DNSToSwitchMapping.class);
       RackResolver.init(conf);
       MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
@@ -591,6 +666,27 @@ public class TestJobHistoryParsing {
     }
   }
 
+  static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory {
+
+    public MRAppWithHistoryWithJobKilled(int maps, int reduces,
+        boolean autoComplete, String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0) {
+        getContext().getEventHandler().handle(
+            new JobEvent(attemptID.getTaskId().getJobId(),
+                JobEventType.JOB_KILL));
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
   static class HistoryFileManagerForTest extends HistoryFileManager {
     void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
       jobListCache.delete(fileInfo);
@@ -613,7 +709,7 @@ public class TestJobHistoryParsing {
     try {
       Configuration conf = new Configuration();
       conf.setClass(
-          CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+          NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
           MyResolver.class, DNSToSwitchMapping.class);
 
       RackResolver.init(conf);
@@ -667,7 +763,7 @@ public class TestJobHistoryParsing {
       Configuration configuration = new Configuration();
       configuration
           .setClass(
-              CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+              NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
               MyResolver.class, DNSToSwitchMapping.class);
 
       RackResolver.init(configuration);
@@ -739,7 +835,7 @@ public class TestJobHistoryParsing {
     final org.apache.hadoop.mapreduce.TaskType taskType =
         org.apache.hadoop.mapreduce.TaskType.MAP;
     final TaskID[] tids = new TaskID[2];
-    JobID jid = new JobID("1", 1);
+    final JobID jid = new JobID("1", 1);
     tids[0] = new TaskID(jid, taskType, 0);
     tids[1] = new TaskID(jid, taskType, 1);
     Mockito.when(reader.getNextEvent()).thenAnswer(
@@ -758,6 +854,13 @@ public class TestJobHistoryParsing {
               tfe.setDatum(tfe.getDatum());
               return tfe;
             }
+            if (eventId < 5) {
+              JobUnsuccessfulCompletionEvent juce =
+                  new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
+                      "JOB_FAILED", Collections.singletonList(
+                          "Task failed: " + tids[0].toString()));
+              return juce;
+            }
             return null;
           }
         });
@@ -765,4 +868,22 @@ public class TestJobHistoryParsing {
     assertTrue("Task 0 not implicated",
         info.getErrorInfo().contains(tids[0].toString()));
   }
+
+  @Test
+  public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
+    final Path histPath = new Path(getClass().getClassLoader().getResource(
+        "job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
+        .getFile());
+    final FileSystem lfs = FileSystem.getLocal(new Configuration());
+    final FSDataInputStream fsdis = lfs.open(histPath);
+    try {
+      JobHistoryParser parser = new JobHistoryParser(fsdis);
+      JobInfo info = parser.parse();
+      assertEquals("History parsed jobId incorrectly",
+          info.getJobId(), JobID.forName("job_1393307629410_0001") );
+      assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
+    } finally {
+      fsdis.close();
+    }
+  }
 }