You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wi...@apache.org on 2020/05/28 03:44:28 UTC
[hadoop] branch branch-3.3.0 updated: MAPREDUCE-7278. Speculative
execution behavior is observed even when mapreduce.map.speculative and
mapreduce.reduce.speculative are false
This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch branch-3.3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3.0 by this push:
new 542c478 MAPREDUCE-7278. Speculative execution behavior is observed even when mapreduce.map.speculative and mapreduce.reduce.speculative are false
542c478 is described below
commit 542c478a8c0c3eb69b71129a163f68bbed494a72
Author: Wilfred Spiegelenburg <wi...@apache.org>
AuthorDate: Thu May 28 13:23:01 2020 +1000
MAPREDUCE-7278. Speculative execution behavior is observed even when mapreduce.map.speculative and mapreduce.reduce.speculative are false
Contributed by Tarun Parimi.
(cherry picked from commit 10db97df1c8562a9e29b00e60d5bde1773c09188)
---
.../hadoop/mapreduce/v2/app/job/impl/TaskImpl.java | 25 +++--
.../mapreduce/v2/TestSpeculativeExecution.java | 107 ++++++++++++++++++++-
2 files changed, 124 insertions(+), 8 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
index ce3b3cc..b34cd7f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -142,7 +143,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private boolean historyTaskStartGenerated = false;
// Launch time reported in history events.
private long launchTime;
-
+ private boolean speculationEnabled = false;
+
private static final SingleArcTransition<TaskImpl, TaskEvent>
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
private static final SingleArcTransition<TaskImpl, TaskEvent>
@@ -325,6 +327,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
this.appContext = appContext;
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
+ this.speculationEnabled = taskType.equals(TaskType.MAP) ?
+ conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) :
+ conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
@@ -1079,13 +1084,19 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.successfulAttempt == null) {
boolean shouldAddNewAttempt = true;
if (task.inProgressAttempts.size() > 0) {
- // if not all of the inProgressAttempts are hanging for resource
- for (TaskAttemptId attemptId : task.inProgressAttempts) {
- if (((TaskAttemptImpl) task.getAttempt(attemptId))
- .isContainerAssigned()) {
- shouldAddNewAttempt = false;
- break;
+ if(task.speculationEnabled) {
+ // if not all of the inProgressAttempts are hanging for resource
+ for (TaskAttemptId attemptId : task.inProgressAttempts) {
+ if (((TaskAttemptImpl) task.getAttempt(attemptId))
+ .isContainerAssigned()) {
+ shouldAddNewAttempt = false;
+ break;
+ }
}
+ } else {
+ // No need to add new attempt if there are in progress attempts
+ // when speculation is false
+ shouldAddNewAttempt = false;
}
}
if (shouldAddNewAttempt) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
index fe21f07..8527dc3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
@@ -192,6 +192,46 @@ public class TestSpeculativeExecution {
}
}
+ public static class FailOnceMapper extends
+ Mapper<Object, Text, Text, IntWritable> {
+
+ public void map(Object key, Text value, Context context)
+ throws IOException, InterruptedException {
+ TaskAttemptID taid = context.getTaskAttemptID();
+ try{
+ Thread.sleep(2000);
+ } catch(InterruptedException ie) {
+ // Ignore
+ }
+ // Fail mapper only for first attempt
+ if (taid.getId() == 0) {
+ throw new RuntimeException("Failing this mapper");
+ }
+
+ context.write(value, new IntWritable(1));
+ }
+ }
+
+ public static class FailOnceReducer extends
+ Reducer<Text, IntWritable, Text, IntWritable> {
+
+ public void reduce(Text key, Iterable<IntWritable> values,
+ Context context) throws IOException, InterruptedException {
+ TaskAttemptID taid = context.getTaskAttemptID();
+ try{
+ Thread.sleep(2000);
+ } catch(InterruptedException ie) {
+ // Ignore
+ }
+ // Fail reduce only for first attempt
+ if (taid.getId() == 0) {
+ throw new RuntimeException("Failing this reducer");
+ }
+ context.write(key, new IntWritable(0));
+ }
+ }
+
+
@Test
public void testSpeculativeExecution() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
@@ -218,6 +258,30 @@ public class TestSpeculativeExecution {
Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
.getValue());
+
+ /*------------------------------------------------------------------
+ * Test that Map/Red does not speculate if MAP_SPECULATIVE and
+ * REDUCE_SPECULATIVE are both false. When map tasks fail once and time out,
+ * we shouldn't launch two simultaneous attempts. MAPREDUCE-7278
+ * -----------------------------------------------------------------
+ */
+ job = runNonSpecFailOnceTest();
+
+ succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ counters = job.getCounters();
+ // We will have 4 total since 2 map tasks fail and relaunch attempt once
+ Assert.assertEquals(4,
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue());
+ Assert.assertEquals(4,
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+ // Ensure no maps or reduces killed due to accidental speculation
+ Assert.assertEquals(0,
+ counters.findCounter(JobCounter.NUM_KILLED_MAPS).getValue());
+ Assert.assertEquals(0,
+ counters.findCounter(JobCounter.NUM_KILLED_REDUCES).getValue());
+
/*----------------------------------------------------------------------
* Test that Mapper speculates if MAP_SPECULATIVE is true and
* REDUCE_SPECULATIVE is false.
@@ -295,7 +359,48 @@ public class TestSpeculativeExecution {
// Delete output directory if it exists.
try {
- localFs.delete(TEST_OUT_DIR,true);
+ localFs.delete(TEST_OUT_DIR, true);
+ } catch (IOException e) {
+ // ignore
+ }
+
+ // Creates the Job Configuration
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.setMaxMapAttempts(2);
+
+ job.submit();
+
+ return job;
+ }
+
+ private Job runNonSpecFailOnceTest()
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ Path first = createTempFile("specexec_map_input1", "a\nz");
+ Path secnd = createTempFile("specexec_map_input2", "a\nz");
+
+ Configuration conf = mrCluster.getConfig();
+ conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+ // Prevent blacklisting since tasks fail once
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, false);
+ // Setting small task exit timeout values reproduces MAPREDUCE-7278
+ conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 20);
+ conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(TestSpeculativeExecution.class);
+ job.setMapperClass(FailOnceMapper.class);
+ job.setReducerClass(FailOnceReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ job.setNumReduceTasks(2);
+ FileInputFormat.setInputPaths(job, first);
+ FileInputFormat.addInputPath(job, secnd);
+ FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
+
+ // Delete output directory if it exists.
+ try {
+ localFs.delete(TEST_OUT_DIR, true);
} catch (IOException e) {
// ignore
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org