You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zx...@apache.org on 2015/07/31 08:08:21 UTC
hadoop git commit: MAPREDUCE-6433. launchTime may be negative.
Contributed by Zhihai Xu
Repository: hadoop
Updated Branches:
refs/heads/trunk ab80e2770 -> 93d50b782
MAPREDUCE-6433. launchTime may be negative. Contributed by Zhihai Xu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/93d50b78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/93d50b78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/93d50b78
Branch: refs/heads/trunk
Commit: 93d50b782494af7eef980c4d596a59ff4e11646e
Parents: ab80e27
Author: Zhihai Xu <zx...@apache.org>
Authored: Thu Jul 30 23:07:31 2015 -0700
Committer: Zhihai Xu <zx...@apache.org>
Committed: Thu Jul 30 23:07:31 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 2 +
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 2 +-
.../v2/app/job/event/JobStartEvent.java | 2 +-
.../mapreduce/v2/app/job/impl/JobImpl.java | 2 +-
.../mapreduce/v2/app/TestMRAppMaster.java | 88 +++++++++++++++++++-
.../mapreduce/jobhistory/EventWriter.java | 19 ++++-
6 files changed, 107 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 398ffc6..738dea5 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -542,6 +542,8 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6427. Fix typo in JobHistoryEventHandler. (Ray Chiang via cdouglas)
+ MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index f199ecb..6dc830f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -233,7 +233,7 @@ public class MRAppMaster extends CompositeService {
JobStateInternal forcedState = null;
private final ScheduledExecutorService logSyncer;
- private long recoveredJobStartTime = 0;
+ private long recoveredJobStartTime = -1L;
private static boolean mainStarted = false;
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
index 39051da..a142c31 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java
@@ -25,7 +25,7 @@ public class JobStartEvent extends JobEvent {
long recoveredJobStartTime;
public JobStartEvent(JobId jobID) {
- this(jobID, 0);
+ this(jobID, -1L);
}
public JobStartEvent(JobId jobID, long recoveredJobStartTime) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4c3b3fe..9d141eb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -1629,7 +1629,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
@Override
public void transition(JobImpl job, JobEvent event) {
JobStartEvent jse = (JobStartEvent) event;
- if (jse.getRecoveredJobStartTime() != 0) {
+ if (jse.getRecoveredJobStartTime() != -1L) {
job.startTime = jse.getRecoveredJobStartTime();
} else {
job.startTime = job.clock.getTime();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
index 63b201d..9e0dafc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
@@ -31,9 +31,11 @@ import static org.mockito.Mockito.times;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.HashMap;
import java.util.Map;
@@ -44,16 +46,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.EventWriter;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@@ -61,6 +68,8 @@ import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.AccessControlException;
@@ -111,7 +120,7 @@ public class TestMRAppMaster {
}
dir.mkdirs();
}
-
+
@Test
public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException {
@@ -170,7 +179,46 @@ public class TestMRAppMaster {
// verify the final status is FAILED
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
}
-
+
+ @Test
+ public void testMRAppMasterJobLaunchTime() throws IOException,
+ InterruptedException {
+ String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+ String containerIdStr = "container_1317529182569_0004_000002_1";
+ String userName = "TestAppMasterUser";
+ JobConf conf = new JobConf();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json");
+ ApplicationAttemptId applicationAttemptId = ConverterUtils
+ .toApplicationAttemptId(applicationAttemptIdStr);
+ JobId jobId = TypeConverter.toYarn(
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+
+ File dir = new File(MRApps.getStagingAreaDir(conf, userName).toString(),
+ jobId.toString());
+ dir.mkdirs();
+ File historyFile = new File(JobHistoryUtils.getStagingJobHistoryFile(
+ new Path(dir.toURI().toString()), jobId,
+ (applicationAttemptId.getAttemptId() - 1)).toUri().getRawPath());
+ historyFile.createNewFile();
+ FSDataOutputStream out = new FSDataOutputStream(
+ new FileOutputStream(historyFile), null);
+ EventWriter writer = new EventWriter(out, EventWriter.WriteMode.JSON);
+ writer.close();
+ FileSystem fs = FileSystem.get(conf);
+ JobSplitWriter.createSplitFiles(new Path(dir.getAbsolutePath()), conf,
+ fs, new org.apache.hadoop.mapred.InputSplit[0]);
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMasterTestLaunchTime appMaster =
+ new MRAppMasterTestLaunchTime(applicationAttemptId, containerId,
+ "host", -1, -1, System.currentTimeMillis());
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+ appMaster.stop();
+ assertTrue("Job launch time should not be negative.",
+ appMaster.jobLaunchTime.get() >= 0);
+ }
+
@Test
public void testMRAppMasterSuccessLock() throws IOException,
InterruptedException {
@@ -585,3 +633,39 @@ class MRAppMasterTest extends MRAppMaster {
return spyHistoryService;
}
}
+
+class MRAppMasterTestLaunchTime extends MRAppMasterTest {
+ final AtomicLong jobLaunchTime = new AtomicLong(0L);
+ public MRAppMasterTestLaunchTime(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String host, int port, int httpPort,
+ long submitTime) {
+ super(applicationAttemptId, containerId, host, port, httpPort,
+ submitTime, false, false);
+ }
+
+ @Override
+ protected EventHandler<CommitterEvent> createCommitterEventHandler(
+ AppContext context, OutputCommitter committer) {
+ return new CommitterEventHandler(context, committer,
+ getRMHeartbeatHandler()) {
+ @Override
+ public void handle(CommitterEvent event) {
+ }
+ };
+ }
+
+ @Override
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ return new JobHistoryEventHandler(context, getStartCount()) {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ if (event.getHistoryEvent().getEventType() == EventType.JOB_INITED) {
+ JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
+ jobLaunchTime.set(jie.getLaunchTime());
+ }
+ super.handle(event);
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d50b78/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
index 29489a5..b1cb6dc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
@@ -29,19 +29,25 @@ import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Event Writer is an utility class used to write events to the underlying
* stream. Typically, one event writer (which translates to one stream)
* is created per job
*
*/
-class EventWriter {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class EventWriter {
static final String VERSION = "Avro-Json";
static final String VERSION_BINARY = "Avro-Binary";
@@ -50,11 +56,17 @@ class EventWriter {
new SpecificDatumWriter<Event>(Event.class);
private Encoder encoder;
private static final Log LOG = LogFactory.getLog(EventWriter.class);
+
+ /**
+ * avro encoding format supported by EventWriter.
+ */
public enum WriteMode { JSON, BINARY }
private final WriteMode writeMode;
private final boolean jsonOutput; // Cache value while we have 2 modes
- EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException {
+ @VisibleForTesting
+ public EventWriter(FSDataOutputStream out, WriteMode mode)
+ throws IOException {
this.out = out;
this.writeMode = mode;
if (this.writeMode==WriteMode.JSON) {
@@ -93,7 +105,8 @@ class EventWriter {
out.hflush();
}
- void close() throws IOException {
+ @VisibleForTesting
+ public void close() throws IOException {
try {
encoder.flush();
out.close();