You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/05/04 22:40:29 UTC

hadoop git commit: MAPREDUCE-6259. IllegalArgumentException due to missing job submit time. Contributed by zhihai xu (cherry picked from commit bf70c5ae2824a9139c1aa9d7c14020018881cec2)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 8efcf5bdb -> 81f128f29


MAPREDUCE-6259. IllegalArgumentException due to missing job submit time. Contributed by zhihai xu
(cherry picked from commit bf70c5ae2824a9139c1aa9d7c14020018881cec2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81f128f2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81f128f2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81f128f2

Branch: refs/heads/branch-2
Commit: 81f128f293ad12f154e66776fa3ce2af929523d2
Parents: 8efcf5b
Author: Jason Lowe <jl...@apache.org>
Authored: Mon May 4 20:39:18 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon May 4 20:40:16 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../jobhistory/JobHistoryEventHandler.java      | 15 ++++--
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  4 +-
 .../jobhistory/TestJobHistoryEventHandler.java  | 57 +++++++++++++++++---
 .../mapreduce/jobhistory/AMStartedEvent.java    | 16 ++++--
 5 files changed, 77 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index a7893d6..5d605a7 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -145,6 +145,9 @@ Release 2.7.1 - UNRELEASED
     MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive 
     flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj)
 
+    MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
+    (zhihai xu via jlowe)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 6b0ea79..bf32888 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -426,10 +426,10 @@ public class JobHistoryEventHandler extends AbstractService
    * This should be the first call to history for a job
    * 
    * @param jobId the jobId.
-   * @param forcedJobStateOnShutDown
+   * @param amStartedEvent
    * @throws IOException
    */
-  protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
+  protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent)
       throws IOException {
     if (stagingDirPath == null) {
       LOG.error("Log Directory is null, returning");
@@ -489,8 +489,13 @@ public class JobHistoryEventHandler extends AbstractService
     }
 
     MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
-        user, jobName, jobId, forcedJobStateOnShutDown, queueName);
+        user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(),
+        queueName);
     fi.getJobSummary().setJobId(jobId);
+    fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime());
+    fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime());
+    fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime());
+    fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime());
     fileMap.put(jobId, fi);
   }
 
@@ -541,8 +546,7 @@ public class JobHistoryEventHandler extends AbstractService
         try {
           AMStartedEvent amStartedEvent =
               (AMStartedEvent) event.getHistoryEvent();
-          setupEventWriter(event.getJobID(),
-              amStartedEvent.getForcedJobStateOnShutDown());
+          setupEventWriter(event.getJobID(), amStartedEvent);
         } catch (IOException ioe) {
           LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
               ioe);
@@ -982,6 +986,7 @@ public class JobHistoryEventHandler extends AbstractService
         tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
                 ase.getNodeManagerHttpPort());
         tEvent.addEventInfo("START_TIME", ase.getStartTime());
+        tEvent.addEventInfo("SUBMIT_TIME", ase.getSubmitTime());
         tEntity.addEvent(tEvent);
         tEntity.setEntityId(jobId.toString());
         tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 975f8c3..1868b98 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -1033,7 +1033,7 @@ public class MRAppMaster extends CompositeService {
           new JobHistoryEvent(job.getID(), new AMStartedEvent(info
               .getAppAttemptId(), info.getStartTime(), info.getContainerId(),
               info.getNodeManagerHost(), info.getNodeManagerPort(), info
-                  .getNodeManagerHttpPort())));
+                  .getNodeManagerHttpPort(), appSubmitTime)));
     }
 
     // Send out an MR AM inited event for this AM.
@@ -1042,7 +1042,7 @@ public class MRAppMaster extends CompositeService {
             .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
             amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
                 .getNodeManagerHttpPort(), this.forcedState == null ? null
-                    : this.forcedState.toString())));
+                    : this.forcedState.toString(), appSubmitTime)));
     amInfos.add(amInfo);
 
     // metrics system init is really init & start.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 49be35b..2b07efb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -125,7 +125,7 @@ public class TestJobHistoryEventHandler {
     try {
       jheh.start();
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
       mockWriter = jheh.getEventWriter();
       verify(mockWriter).write(any(HistoryEvent.class));
 
@@ -168,7 +168,7 @@ public class TestJobHistoryEventHandler {
     try {
       jheh.start();
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
       mockWriter = jheh.getEventWriter();
       verify(mockWriter).write(any(HistoryEvent.class));
 
@@ -213,7 +213,7 @@ public class TestJobHistoryEventHandler {
     try {
       jheh.start();
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
       mockWriter = jheh.getEventWriter();
       verify(mockWriter).write(any(HistoryEvent.class));
 
@@ -256,7 +256,7 @@ public class TestJobHistoryEventHandler {
     try {
       jheh.start();
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
       mockWriter = jheh.getEventWriter();
       verify(mockWriter).write(any(HistoryEvent.class));
 
@@ -293,7 +293,7 @@ public class TestJobHistoryEventHandler {
     try {
       jheh.start();
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
       verify(jheh, times(0)).processDoneFiles(any(JobId.class));
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId,
@@ -338,7 +338,7 @@ public class TestJobHistoryEventHandler {
     try {
       jheh.start();
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+        t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
       verify(jheh, times(0)).processDoneFiles(t.jobId);
 
       // skip processing done files
@@ -395,7 +395,7 @@ public class TestJobHistoryEventHandler {
     try {
       jheh.start();
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
           TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
@@ -441,6 +441,47 @@ public class TestJobHistoryEventHandler {
         pathStr);
   }
 
+  // test AMStartedEvent for submitTime and startTime
+  @Test (timeout=50000)
+  public void testAMStartedEvent() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, 100)));
+
+      JobHistoryEventHandler.MetaInfo mi =
+          JobHistoryEventHandler.fileMap.get(t.jobId);
+      Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100);
+      Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200);
+      Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100);
+      Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200);
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId,
+        new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+          0, 0, JobStateInternal.FAILED.toString())));
+
+      Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100);
+      Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200);
+      Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100);
+      Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200);
+      verify(jheh, times(1)).processDoneFiles(t.jobId);
+
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter, times(2)).write(any(HistoryEvent.class));
+    } finally {
+      jheh.stop();
+    }
+  }
+
   // Have JobHistoryEventHandler handle some events and make sure they get
   // stored to the Timeline store
   @Test (timeout=50000)
@@ -463,7 +504,7 @@ public class TestJobHistoryEventHandler {
               .getTimelineStore();
 
       handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
-              t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
+              t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
               currentTime - 10));
       TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
               null, null, null, null, null, null, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
index d1a378b..e1465f5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
@@ -35,6 +35,7 @@ import org.apache.avro.util.Utf8;
 public class AMStartedEvent implements HistoryEvent {
   private AMStarted datum = new AMStarted();
   private String forcedJobStateOnShutDown;
+  private long submitTime;
 
   /**
    * Create an event to record the start of an MR AppMaster
@@ -54,9 +55,9 @@ public class AMStartedEvent implements HistoryEvent {
    */
   public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
       ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
-      int nodeManagerHttpPort) {
+      int nodeManagerHttpPort, long submitTime) {
     this(appAttemptId, startTime, containerId, nodeManagerHost,
-        nodeManagerPort, nodeManagerHttpPort, null);
+        nodeManagerPort, nodeManagerHttpPort, null, submitTime);
   }
 
   /**
@@ -79,7 +80,8 @@ public class AMStartedEvent implements HistoryEvent {
    */
   public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
       ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
-      int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
+      int nodeManagerHttpPort, String forcedJobStateOnShutDown,
+      long submitTime) {
     datum.applicationAttemptId = new Utf8(appAttemptId.toString());
     datum.startTime = startTime;
     datum.containerId = new Utf8(containerId.toString());
@@ -87,6 +89,7 @@ public class AMStartedEvent implements HistoryEvent {
     datum.nodeManagerPort = nodeManagerPort;
     datum.nodeManagerHttpPort = nodeManagerHttpPort;
     this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
+    this.submitTime = submitTime;
   }
 
   AMStartedEvent() {
@@ -150,6 +153,13 @@ public class AMStartedEvent implements HistoryEvent {
     return this.forcedJobStateOnShutDown;
   }
 
+  /**
+   * @return the submit time for the Application(Job)
+   */
+  public long getSubmitTime() {
+    return this.submitTime;
+  }
+
   /** Get the attempt id */
 
   @Override