You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/05/04 22:40:29 UTC
hadoop git commit: MAPREDUCE-6259. IllegalArgumentException due to
missing job submit time. Contributed by zhihai xu (cherry picked from commit
bf70c5ae2824a9139c1aa9d7c14020018881cec2)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 8efcf5bdb -> 81f128f29
MAPREDUCE-6259. IllegalArgumentException due to missing job submit time. Contributed by zhihai xu
(cherry picked from commit bf70c5ae2824a9139c1aa9d7c14020018881cec2)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81f128f2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81f128f2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81f128f2
Branch: refs/heads/branch-2
Commit: 81f128f293ad12f154e66776fa3ce2af929523d2
Parents: 8efcf5b
Author: Jason Lowe <jl...@apache.org>
Authored: Mon May 4 20:39:18 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon May 4 20:40:16 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 ++
.../jobhistory/JobHistoryEventHandler.java | 15 ++++--
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 4 +-
.../jobhistory/TestJobHistoryEventHandler.java | 57 +++++++++++++++++---
.../mapreduce/jobhistory/AMStartedEvent.java | 16 ++++--
5 files changed, 77 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index a7893d6..5d605a7 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -145,6 +145,9 @@ Release 2.7.1 - UNRELEASED
MAPREDUCE-6339. Job history file is not flushed correctly because isTimerActive
flag is not set true when flushTimerTask is scheduled. (zhihai xu via devaraj)
+ MAPREDUCE-6259. IllegalArgumentException due to missing job submit time
+ (zhihai xu via jlowe)
+
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 6b0ea79..bf32888 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -426,10 +426,10 @@ public class JobHistoryEventHandler extends AbstractService
* This should be the first call to history for a job
*
* @param jobId the jobId.
- * @param forcedJobStateOnShutDown
+ * @param amStartedEvent
* @throws IOException
*/
- protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
+ protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent)
throws IOException {
if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning");
@@ -489,8 +489,13 @@ public class JobHistoryEventHandler extends AbstractService
}
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
- user, jobName, jobId, forcedJobStateOnShutDown, queueName);
+ user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(),
+ queueName);
fi.getJobSummary().setJobId(jobId);
+ fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime());
+ fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime());
+ fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime());
+ fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime());
fileMap.put(jobId, fi);
}
@@ -541,8 +546,7 @@ public class JobHistoryEventHandler extends AbstractService
try {
AMStartedEvent amStartedEvent =
(AMStartedEvent) event.getHistoryEvent();
- setupEventWriter(event.getJobID(),
- amStartedEvent.getForcedJobStateOnShutDown());
+ setupEventWriter(event.getJobID(), amStartedEvent);
} catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe);
@@ -982,6 +986,7 @@ public class JobHistoryEventHandler extends AbstractService
tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
ase.getNodeManagerHttpPort());
tEvent.addEventInfo("START_TIME", ase.getStartTime());
+ tEvent.addEventInfo("SUBMIT_TIME", ase.getSubmitTime());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 975f8c3..1868b98 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -1033,7 +1033,7 @@ public class MRAppMaster extends CompositeService {
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
.getAppAttemptId(), info.getStartTime(), info.getContainerId(),
info.getNodeManagerHost(), info.getNodeManagerPort(), info
- .getNodeManagerHttpPort())));
+ .getNodeManagerHttpPort(), appSubmitTime)));
}
// Send out an MR AM inited event for this AM.
@@ -1042,7 +1042,7 @@ public class MRAppMaster extends CompositeService {
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
.getNodeManagerHttpPort(), this.forcedState == null ? null
- : this.forcedState.toString())));
+ : this.forcedState.toString(), appSubmitTime)));
amInfos.add(amInfo);
// metrics system init is really init & start.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 49be35b..2b07efb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -125,7 +125,7 @@ public class TestJobHistoryEventHandler {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@@ -168,7 +168,7 @@ public class TestJobHistoryEventHandler {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@@ -213,7 +213,7 @@ public class TestJobHistoryEventHandler {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@@ -256,7 +256,7 @@ public class TestJobHistoryEventHandler {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
@@ -293,7 +293,7 @@ public class TestJobHistoryEventHandler {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
@@ -338,7 +338,7 @@ public class TestJobHistoryEventHandler {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(t.jobId);
// skip processing done files
@@ -395,7 +395,7 @@ public class TestJobHistoryEventHandler {
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
@@ -441,6 +441,47 @@ public class TestJobHistoryEventHandler {
pathStr);
}
+ // test AMStartedEvent for submitTime and startTime
+ @Test (timeout=50000)
+ public void testAMStartedEvent() throws Exception {
+ TestParams t = new TestParams();
+ Configuration conf = new Configuration();
+
+ JHEvenHandlerForTest realJheh =
+ new JHEvenHandlerForTest(t.mockAppContext, 0);
+ JHEvenHandlerForTest jheh = spy(realJheh);
+ jheh.init(conf);
+
+ EventWriter mockWriter = null;
+ try {
+ jheh.start();
+ handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, 100)));
+
+ JobHistoryEventHandler.MetaInfo mi =
+ JobHistoryEventHandler.fileMap.get(t.jobId);
+ Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100);
+ Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200);
+ Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100);
+ Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200);
+
+ handleEvent(jheh, new JobHistoryEvent(t.jobId,
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
+ 0, 0, JobStateInternal.FAILED.toString())));
+
+ Assert.assertEquals(mi.getJobIndexInfo().getSubmitTime(), 100);
+ Assert.assertEquals(mi.getJobIndexInfo().getJobStartTime(), 200);
+ Assert.assertEquals(mi.getJobSummary().getJobSubmitTime(), 100);
+ Assert.assertEquals(mi.getJobSummary().getJobLaunchTime(), 200);
+ verify(jheh, times(1)).processDoneFiles(t.jobId);
+
+ mockWriter = jheh.getEventWriter();
+ verify(mockWriter, times(2)).write(any(HistoryEvent.class));
+ } finally {
+ jheh.stop();
+ }
+ }
+
// Have JobHistoryEventHandler handle some events and make sure they get
// stored to the Timeline store
@Test (timeout=50000)
@@ -463,7 +504,7 @@ public class TestJobHistoryEventHandler {
.getTimelineStore();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
- t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
currentTime - 10));
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null, null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f128f2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
index d1a378b..e1465f5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
@@ -35,6 +35,7 @@ import org.apache.avro.util.Utf8;
public class AMStartedEvent implements HistoryEvent {
private AMStarted datum = new AMStarted();
private String forcedJobStateOnShutDown;
+ private long submitTime;
/**
* Create an event to record the start of an MR AppMaster
@@ -54,9 +55,9 @@ public class AMStartedEvent implements HistoryEvent {
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
- int nodeManagerHttpPort) {
+ int nodeManagerHttpPort, long submitTime) {
this(appAttemptId, startTime, containerId, nodeManagerHost,
- nodeManagerPort, nodeManagerHttpPort, null);
+ nodeManagerPort, nodeManagerHttpPort, null, submitTime);
}
/**
@@ -79,7 +80,8 @@ public class AMStartedEvent implements HistoryEvent {
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
- int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
+ int nodeManagerHttpPort, String forcedJobStateOnShutDown,
+ long submitTime) {
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
datum.startTime = startTime;
datum.containerId = new Utf8(containerId.toString());
@@ -87,6 +89,7 @@ public class AMStartedEvent implements HistoryEvent {
datum.nodeManagerPort = nodeManagerPort;
datum.nodeManagerHttpPort = nodeManagerHttpPort;
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
+ this.submitTime = submitTime;
}
AMStartedEvent() {
@@ -150,6 +153,13 @@ public class AMStartedEvent implements HistoryEvent {
return this.forcedJobStateOnShutDown;
}
+ /**
+ * @return the submit time for the Application(Job)
+ */
+ public long getSubmitTime() {
+ return this.submitTime;
+ }
+
/** Get the attempt id */
@Override